Skip to main content

lash_core/
tool_provider.rs

1use std::collections::HashSet;
2use std::future::Future;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use lash_sansio::llm::types::ProviderReplayMeta;
7use serde::{Deserialize, Serialize};
8
9use crate::plugin::{
10    PluginError, SessionGraphService, SessionLifecycleService, SessionSnapshot, SessionStateService,
11};
12use crate::{AttachmentStore, ToolContract, ToolId, ToolManifest, ToolResult};
13
14mod attachments;
15mod direct_completion;
16mod dispatch;
17mod process;
18pub(crate) mod process_events;
19mod session;
20mod triggers;
21
22pub use attachments::ToolAttachmentClient;
23pub use direct_completion::ToolDirectCompletionClient;
24pub use dispatch::ToolDispatchClient;
25pub use process::ToolSessionProcessAdmin;
26pub use process_events::ToolProcessEventClient;
27pub use session::{ToolSessionAdmin, ToolSessionModel};
28pub use triggers::ToolTriggerClient;
29
30/// A message sent from the sandbox to the host during execution.
31#[derive(Clone, Debug)]
32pub struct SandboxMessage {
33    pub text: String,
34    /// "tool_output" or another host-rendered progress event kind.
35    pub kind: String,
36}
37
38/// Sender for streaming progress messages from tools (e.g. live bash output).
39pub type ProgressSender = tokio::sync::mpsc::UnboundedSender<SandboxMessage>;
40
41#[derive(Clone, Default)]
42pub(crate) struct ToolCompletionState {
43    key: Arc<Mutex<Option<crate::AwaitEventKey>>>,
44}
45
46impl ToolCompletionState {
47    fn store(
48        &self,
49        key: crate::AwaitEventKey,
50    ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
51        let mut guard = self.key.lock().map_err(|_| {
52            crate::RuntimeError::new(
53                "tool_completion_state_poisoned",
54                "tool completion key state lock poisoned",
55            )
56        })?;
57        if let Some(existing) = guard.as_ref() {
58            return Ok(existing.clone());
59        }
60        *guard = Some(key.clone());
61        Ok(key)
62    }
63
64    pub(crate) fn take(&self) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
65        self.key.lock().map(|mut guard| guard.take()).map_err(|_| {
66            crate::RuntimeError::new(
67                "tool_completion_state_poisoned",
68                "tool completion key state lock poisoned",
69            )
70        })
71    }
72}
73
74#[derive(Clone, Default)]
75pub(crate) struct ToolDurableEffectState {
76    step_ids: Arc<Mutex<HashSet<String>>>,
77    process_event_sequence: Arc<AtomicU64>,
78}
79
80impl ToolDurableEffectState {
81    fn reserve_step(&self, step_id: &str) -> Result<(), crate::RuntimeError> {
82        let mut guard = self.step_ids.lock().map_err(|_| {
83            crate::RuntimeError::new(
84                "durable_effect_state_poisoned",
85                "durable effect step state lock poisoned",
86            )
87        })?;
88        if !guard.insert(step_id.to_string()) {
89            return Err(crate::RuntimeError::new(
90                "durable_effect_duplicate_step_id",
91                format!("durable effect step id `{step_id}` was already used by this tool call"),
92            ));
93        }
94        Ok(())
95    }
96
97    fn next_process_event_sequence(&self) -> u64 {
98        self.process_event_sequence.fetch_add(1, Ordering::Relaxed)
99    }
100}
101
102/// Per-call environment for [`ToolProvider::execute`]. Fields are sealed so
103/// the runtime can add capabilities without breaking tool authors.
104#[derive(Clone)]
105pub struct ToolContext<'run> {
106    pub(crate) session_id: String,
107    pub(crate) agent_frame_id: crate::AgentFrameId,
108    pub(crate) sessions: Arc<dyn SessionStateService>,
109    pub(crate) session_lifecycle: Arc<dyn SessionLifecycleService>,
110    pub(crate) processes: Arc<dyn crate::ProcessService>,
111    pub(crate) process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
112    pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
113    pub(crate) runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
114    pub(crate) runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
115    pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
116    pub(crate) async_process_id: Option<String>,
117    pub(crate) runtime_process_id: Option<String>,
118    pub(crate) process_events: Option<ToolProcessEventContext>,
119    pub(crate) attachment_store: Arc<dyn AttachmentStore>,
120    pub(crate) direct_completions: crate::DirectCompletionClient<'run>,
121    pub(crate) prepared_payload: serde_json::Value,
122    /// The id of the in-flight tool call that is invoking this tool.
123    pub(crate) tool_call_id: Option<String>,
124    pub(crate) attempt_number: u32,
125    pub(crate) max_attempts: u32,
126    pub(crate) replay_key: Option<String>,
127    pub(crate) completion: ToolCompletionState,
128    pub(crate) durable_effects: ToolDurableEffectState,
129    pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
130    pub(crate) execution_env_spec: crate::ProcessExecutionEnvSpec,
131    pub(crate) child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
132}
133
134#[derive(Clone)]
135pub struct ToolChildProcessStarted {
136    pub process_id: String,
137    pub child_entry_name: Option<String>,
138}
139
140#[derive(Clone)]
141pub struct ToolChildExecutionTraceHook {
142    on_child_process_started: Arc<dyn Fn(ToolChildProcessStarted) + Send + Sync>,
143}
144
145impl ToolChildExecutionTraceHook {
146    pub fn new(
147        on_child_process_started: impl Fn(ToolChildProcessStarted) + Send + Sync + 'static,
148    ) -> Self {
149        Self {
150            on_child_process_started: Arc::new(on_child_process_started),
151        }
152    }
153
154    pub fn child_process_started(&self, event: ToolChildProcessStarted) {
155        (self.on_child_process_started)(event);
156    }
157}
158
159#[derive(Clone)]
160pub(crate) struct ToolProcessEventContext {
161    process_id: String,
162    registry: Arc<dyn crate::ProcessRegistry>,
163    store: Option<Arc<dyn crate::RuntimePersistence>>,
164    session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
165    session_graph: Arc<dyn SessionGraphService>,
166    queued_work_driver: Option<crate::QueuedWorkDriver>,
167}
168
169pub(crate) struct ToolContextBuilder<'run> {
170    session_id: String,
171    agent_frame_id: crate::AgentFrameId,
172    sessions: Arc<dyn SessionStateService>,
173    session_lifecycle: Arc<dyn SessionLifecycleService>,
174    session_graph: Arc<dyn SessionGraphService>,
175    processes: Arc<dyn crate::ProcessService>,
176    process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
177    effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
178    runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
179    runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
180    cancellation_token: Option<tokio_util::sync::CancellationToken>,
181    async_process_id: Option<String>,
182    runtime_process_id: Option<String>,
183    process_events: Option<ToolProcessEventContext>,
184    attachment_store: Arc<dyn AttachmentStore>,
185    direct_completions: crate::DirectCompletionClient<'run>,
186    prepared_payload: serde_json::Value,
187    tool_call_id: Option<String>,
188    completion: ToolCompletionState,
189    durable_effects: ToolDurableEffectState,
190    parent_invocation: Option<crate::RuntimeInvocation>,
191    execution_env_spec: crate::ProcessExecutionEnvSpec,
192    child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
193}
194
195impl<'run> ToolContextBuilder<'run> {
196    pub(crate) fn from_dispatch(
197        dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
198    ) -> Self {
199        Self {
200            session_id: dispatch.session_id.clone(),
201            agent_frame_id: dispatch.agent_frame_id.clone(),
202            sessions: Arc::clone(&dispatch.sessions),
203            session_lifecycle: Arc::clone(&dispatch.session_lifecycle),
204            session_graph: Arc::clone(&dispatch.session_graph),
205            processes: Arc::clone(&dispatch.processes),
206            process_cancel_ability: Arc::clone(&dispatch.process_cancel_ability),
207            effect_controller: dispatch.effect_controller.clone(),
208            runtime_dispatch: Some(Arc::clone(&dispatch)),
209            runtime_execution_context: None,
210            cancellation_token: None,
211            async_process_id: None,
212            runtime_process_id: None,
213            process_events: None,
214            attachment_store: Arc::clone(&dispatch.attachment_store),
215            direct_completions: dispatch.direct_completions.clone(),
216            prepared_payload: serde_json::Value::Null,
217            tool_call_id: None,
218            completion: ToolCompletionState::default(),
219            durable_effects: ToolDurableEffectState::default(),
220            parent_invocation: dispatch.parent_invocation.clone(),
221            execution_env_spec: dispatch.execution_env_spec.clone(),
222            child_execution_trace_hook: None,
223        }
224    }
225
226    #[cfg(any(test, feature = "testing"))]
227    pub(crate) fn tool_call_id(mut self, tool_call_id: impl Into<Option<String>>) -> Self {
228        self.tool_call_id = tool_call_id.into();
229        self
230    }
231
232    pub(crate) fn prepared_call(mut self, call: &PreparedToolCall) -> Self {
233        self.tool_call_id = Some(call.call_id.clone());
234        self.prepared_payload = call.prepared_payload.clone();
235        self
236    }
237
238    pub(crate) fn cancellation_token(
239        mut self,
240        cancellation_token: Option<tokio_util::sync::CancellationToken>,
241    ) -> Self {
242        self.cancellation_token = cancellation_token;
243        self
244    }
245
246    pub(crate) fn runtime_execution_context(
247        mut self,
248        context: crate::RuntimeExecutionContext<'run>,
249    ) -> Self {
250        self.runtime_execution_context = Some(context);
251        self
252    }
253
254    pub(crate) fn runtime_process_id(mut self, process_id: Option<String>) -> Self {
255        self.runtime_process_id = process_id;
256        self
257    }
258
259    pub(crate) fn async_process(
260        mut self,
261        process_id: impl Into<String>,
262        cancellation_token: tokio_util::sync::CancellationToken,
263    ) -> Self {
264        self.async_process_id = Some(process_id.into());
265        self.cancellation_token = Some(cancellation_token);
266        self
267    }
268
269    pub(crate) fn process_events(
270        mut self,
271        process_id: impl Into<String>,
272        registry: Arc<dyn crate::ProcessRegistry>,
273        store: Option<Arc<dyn crate::RuntimePersistence>>,
274        session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
275        queued_work_driver: Option<crate::QueuedWorkDriver>,
276    ) -> Self {
277        self.process_events = Some(ToolProcessEventContext {
278            process_id: process_id.into(),
279            registry,
280            store,
281            session_store_factory,
282            session_graph: Arc::clone(&self.session_graph),
283            queued_work_driver,
284        });
285        self
286    }
287
288    pub(crate) fn parent_invocation(mut self, metadata: Option<crate::RuntimeInvocation>) -> Self {
289        self.parent_invocation = metadata;
290        self
291    }
292
293    pub(crate) fn child_execution_trace_hook(
294        mut self,
295        hook: Option<ToolChildExecutionTraceHook>,
296    ) -> Self {
297        self.child_execution_trace_hook = hook;
298        self
299    }
300
301    pub(crate) fn build(self) -> ToolContext<'run> {
302        ToolContext {
303            session_id: self.session_id,
304            agent_frame_id: self.agent_frame_id,
305            sessions: self.sessions,
306            session_lifecycle: self.session_lifecycle,
307            processes: self.processes,
308            process_cancel_ability: self.process_cancel_ability,
309            effect_controller: self.effect_controller,
310            runtime_dispatch: self.runtime_dispatch,
311            runtime_execution_context: self.runtime_execution_context,
312            cancellation_token: self.cancellation_token,
313            async_process_id: self.async_process_id,
314            runtime_process_id: self.runtime_process_id,
315            process_events: self.process_events,
316            attachment_store: self.attachment_store,
317            direct_completions: self.direct_completions,
318            prepared_payload: self.prepared_payload,
319            tool_call_id: self.tool_call_id,
320            attempt_number: 1,
321            max_attempts: 1,
322            replay_key: None,
323            completion: self.completion,
324            durable_effects: self.durable_effects,
325            parent_invocation: self.parent_invocation,
326            execution_env_spec: self.execution_env_spec,
327            child_execution_trace_hook: self.child_execution_trace_hook,
328        }
329    }
330}
331
332impl<'run> ToolContext<'run> {
333    #[cfg(any(test, feature = "testing"))]
334    #[expect(
335        clippy::too_many_arguments,
336        reason = "testing constructor mirrors the sealed runtime tool context dependencies"
337    )]
338    pub(crate) fn builder(
339        session_id: String,
340        sessions: Arc<dyn SessionStateService>,
341        session_lifecycle: Arc<dyn SessionLifecycleService>,
342        session_graph: Arc<dyn SessionGraphService>,
343        processes: Arc<dyn crate::ProcessService>,
344        process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
345        effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
346        attachment_store: Arc<dyn AttachmentStore>,
347        direct_completions: crate::DirectCompletionClient<'run>,
348    ) -> ToolContextBuilder<'run> {
349        ToolContextBuilder {
350            session_id,
351            agent_frame_id: String::new(),
352            sessions,
353            session_lifecycle,
354            session_graph,
355            processes,
356            process_cancel_ability,
357            effect_controller,
358            runtime_dispatch: None,
359            runtime_execution_context: None,
360            cancellation_token: None,
361            async_process_id: None,
362            runtime_process_id: None,
363            process_events: None,
364            attachment_store,
365            direct_completions,
366            prepared_payload: serde_json::Value::Null,
367            tool_call_id: None,
368            completion: ToolCompletionState::default(),
369            durable_effects: ToolDurableEffectState::default(),
370            parent_invocation: None,
371            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
372                crate::PluginOptions::default(),
373                crate::SessionPolicy::default(),
374            ),
375            child_execution_trace_hook: None,
376        }
377    }
378
379    pub(crate) fn from_dispatch(
380        dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
381    ) -> ToolContextBuilder<'run> {
382        ToolContextBuilder::from_dispatch(dispatch)
383    }
384
385    pub fn session_id(&self) -> &str {
386        &self.session_id
387    }
388
389    pub fn agent_frame_id(&self) -> &str {
390        &self.agent_frame_id
391    }
392
393    pub fn sessions(&self) -> ToolSessionAdmin<'run> {
394        ToolSessionAdmin {
395            session_id: self.session_id.clone(),
396            sessions: Arc::clone(&self.sessions),
397            session_lifecycle: Arc::clone(&self.session_lifecycle),
398            effect_controller: self.effect_controller.clone(),
399        }
400    }
401
402    pub fn dispatch(&self) -> ToolDispatchClient<'run> {
403        ToolDispatchClient {
404            context: self.clone(),
405        }
406    }
407
408    pub fn triggers(&self) -> ToolTriggerClient<'run> {
409        ToolTriggerClient {
410            context: self.clone(),
411        }
412    }
413
414    pub fn processes(&self) -> ToolSessionProcessAdmin<'run> {
415        ToolSessionProcessAdmin {
416            session_id: self.session_id.clone(),
417            agent_frame_id: self.agent_frame_id.clone(),
418            processes: Arc::clone(&self.processes),
419            process_cancel_ability: Arc::clone(&self.process_cancel_ability),
420            effect_controller: self.effect_controller.clone(),
421            parent_invocation: self.parent_invocation.clone(),
422            tool_call_id: self.tool_call_id.clone(),
423            execution_env_spec: self.execution_env_spec.clone(),
424        }
425    }
426
427    pub fn emit_child_process_started(
428        &self,
429        process_id: impl Into<String>,
430        child_entry_name: Option<String>,
431    ) {
432        let Some(hook) = &self.child_execution_trace_hook else {
433            return;
434        };
435        hook.child_process_started(ToolChildProcessStarted {
436            process_id: process_id.into(),
437            child_entry_name,
438        });
439    }
440
441    pub fn direct_completions(&self) -> ToolDirectCompletionClient<'run> {
442        ToolDirectCompletionClient {
443            session_id: self.session_id.clone(),
444            tool_call_id: self.tool_call_id.clone(),
445            direct_completions: self.direct_completions.clone(),
446            parent_invocation: self.parent_invocation.clone(),
447            parent_tool_attempt_is_durable: self.parent_invocation.as_ref().is_some_and(
448                |invocation| {
449                    invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolAttempt)
450                },
451            ) && self
452                .effect_controller
453                .controller()
454                .durability_tier()
455                == crate::DurabilityTier::Durable,
456        }
457    }
458
459    pub fn attachments(&self) -> ToolAttachmentClient {
460        ToolAttachmentClient {
461            store: Arc::clone(&self.attachment_store),
462        }
463    }
464
465    pub fn process_events(&self) -> ToolProcessEventClient {
466        ToolProcessEventClient {
467            context: self.process_events.clone(),
468        }
469    }
470
471    /// Borrow this tool call's durable effect boundary.
472    ///
473    /// This is available only while executing a prepared tool call under a
474    /// controller that explicitly supports durable tool effects. The returned
475    /// facade records JSON steps and await-event waits in the caller's existing
476    /// effect log; it does not expose the underlying workflow engine context.
477    pub fn durable_effects(&self) -> Result<ToolDurableEffects<'_, 'run>, crate::RuntimeError> {
478        let Some(tool_call_id) = self.tool_call_id.as_deref() else {
479            return Err(crate::RuntimeError::new(
480                "durable_effects_missing_call_id",
481                "durable effects require a prepared tool call id",
482            ));
483        };
484        if tool_call_id.trim().is_empty() {
485            return Err(crate::RuntimeError::new(
486                "durable_effects_missing_call_id",
487                "durable effects require a non-empty prepared tool call id",
488            ));
489        }
490        let scoped = self.effect_controller.scoped();
491        if !scoped.controller().supports_durable_effects() {
492            return Err(crate::RuntimeError::new(
493                "durable_effects_unavailable",
494                "this effect controller does not support durable tool effects",
495            ));
496        }
497        if self.parent_invocation.as_ref().is_some_and(|invocation| {
498            invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolAttempt)
499        }) && scoped.controller().durability_tier() == crate::DurabilityTier::Durable
500        {
501            return Err(crate::RuntimeError::new(
502                "durable_effects_unavailable_in_tool_attempt",
503                "durable tool sub-effects are not available inside a journaled tool attempt",
504            ));
505        }
506        Ok(ToolDurableEffects { context: self })
507    }
508
509    pub fn cancellation_token(&self) -> Option<&tokio_util::sync::CancellationToken> {
510        self.cancellation_token.as_ref()
511    }
512
513    pub fn async_process_id(&self) -> Option<&str> {
514        self.async_process_id.as_deref()
515    }
516
517    pub fn runtime_process_id(&self) -> Option<&str> {
518        self.async_process_id
519            .as_deref()
520            .or(self.runtime_process_id.as_deref())
521            .or_else(|| {
522                self.process_events
523                    .as_ref()
524                    .map(|context| context.process_id.as_str())
525            })
526    }
527
528    pub fn tool_call_id(&self) -> Option<&str> {
529        self.tool_call_id.as_deref()
530    }
531
532    pub fn prepared_payload(&self) -> &serde_json::Value {
533        &self.prepared_payload
534    }
535
536    pub fn decode_prepared_payload<T>(&self) -> Result<T, serde_json::Error>
537    where
538        T: serde::de::DeserializeOwned,
539    {
540        serde_json::from_value(self.prepared_payload.clone())
541    }
542
543    pub fn attempt_number(&self) -> u32 {
544        self.attempt_number
545    }
546
547    pub fn max_attempts(&self) -> u32 {
548        self.max_attempts
549    }
550
551    pub fn replay_key(&self) -> Option<&str> {
552        self.replay_key.as_deref()
553    }
554
555    /// Obtain the durable completion key for this call, required before returning
556    /// [`ToolResult::Pending`](crate::ToolResult::Pending).
557    ///
558    /// A tool that defers its outcome (waiting on a webhook, human approval, or another
559    /// service) calls this, hands the returned [`AwaitEventKey`](crate::AwaitEventKey)
560    /// to whatever will complete the work out-of-band, and then returns
561    /// `ToolResult::Pending(..)`. The key names the durable wait the runtime parks the
562    /// call on; the external resolver delivers the result against it later.
563    ///
564    /// The key is stored on the context and consumed by the dispatcher when the tool
565    /// returns `Pending`. Returning `Pending` without first calling this fails the call
566    /// with `pending_tool_missing_completion_key`. Calls made outside a prepared tool
567    /// invocation (no tool call id) fail with `tool_completion_key_missing_call_id`.
568    pub async fn completion_key(&self) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
569        let tool_call_id = self.tool_call_id.clone().ok_or_else(|| {
570            crate::RuntimeError::new(
571                "tool_completion_key_missing_call_id",
572                "completion keys require a prepared tool call id",
573            )
574        })?;
575        let scoped = self.effect_controller.scoped();
576        let key = scoped
577            .controller()
578            .await_event_key(
579                scoped.execution_scope(),
580                crate::AwaitEventWaitIdentity::tool_completion(tool_call_id),
581            )
582            .await?;
583        self.completion.store(key)
584    }
585
586    pub(crate) fn take_completion_key(
587        &self,
588    ) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
589        self.completion.take()
590    }
591
592    pub fn with_async_process(
593        mut self,
594        process_id: impl Into<String>,
595        cancellation_token: tokio_util::sync::CancellationToken,
596    ) -> Self {
597        self.async_process_id = Some(process_id.into());
598        self.runtime_process_id = self.async_process_id.clone();
599        self.cancellation_token = Some(cancellation_token);
600        self
601    }
602
603    #[cfg(any(test, feature = "testing"))]
604    #[doc(hidden)]
605    pub fn with_process_events_for_testing(
606        mut self,
607        process_id: impl Into<String>,
608        registry: Arc<dyn crate::ProcessRegistry>,
609    ) -> Self {
610        self.process_events = Some(ToolProcessEventContext {
611            process_id: process_id.into(),
612            registry,
613            store: None,
614            session_store_factory: None,
615            session_graph: Arc::new(crate::plugin::NoopSessionManager),
616            queued_work_driver: None,
617        });
618        self
619    }
620
621    pub(crate) fn with_retry_context(
622        mut self,
623        tool_name: &str,
624        attempt_number: u32,
625        max_attempts: u32,
626    ) -> Self {
627        self.attempt_number = attempt_number.max(1);
628        self.max_attempts = max_attempts.max(1);
629        self.replay_key = self
630            .tool_call_id
631            .as_ref()
632            .map(|call_id| format!("lash-tool:{}:{call_id}:{tool_name}", self.session_id));
633        self
634    }
635
636    pub(crate) fn with_prepared_payload(mut self, payload: serde_json::Value) -> Self {
637        self.prepared_payload = payload;
638        self
639    }
640
641    /// Constructor reserved for `lash_core::testing` helpers. Do not call directly;
642    /// use [`lash_core::testing::mock_tool_context`] instead.
643    #[cfg(any(test, feature = "testing"))]
644    #[doc(hidden)]
645    #[expect(
646        clippy::too_many_arguments,
647        reason = "test-only constructor mirrors the sealed runtime tool context"
648    )]
649    pub fn __for_testing(
650        session_id: String,
651        sessions: Arc<dyn SessionStateService>,
652        session_lifecycle: Arc<dyn SessionLifecycleService>,
653        session_graph: Arc<dyn SessionGraphService>,
654        processes: Arc<dyn crate::ProcessService>,
655        attachment_store: Arc<dyn AttachmentStore>,
656        direct_completions: crate::DirectCompletionClient<'static>,
657        tool_call_id: Option<String>,
658    ) -> ToolContext<'static> {
659        ToolContext::builder(
660            session_id,
661            sessions,
662            session_lifecycle,
663            session_graph,
664            processes,
665            Arc::new(crate::DefaultProcessCancelAbility),
666            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
667                crate::InlineRuntimeEffectController,
668            )),
669            attachment_store,
670            direct_completions,
671        )
672        .tool_call_id(tool_call_id)
673        .build()
674    }
675
676    /// Constructor reserved for tests that need a custom process-cancel host
677    /// ability. Do not call directly; prefer public testing helpers when they
678    /// cover the case.
679    #[cfg(any(test, feature = "testing"))]
680    #[doc(hidden)]
681    #[expect(
682        clippy::too_many_arguments,
683        reason = "test-only constructor mirrors the sealed runtime context"
684    )]
685    pub fn __for_testing_with_process_cancel_ability(
686        session_id: String,
687        sessions: Arc<dyn SessionStateService>,
688        session_lifecycle: Arc<dyn SessionLifecycleService>,
689        session_graph: Arc<dyn SessionGraphService>,
690        processes: Arc<dyn crate::ProcessService>,
691        process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
692        attachment_store: Arc<dyn AttachmentStore>,
693        direct_completions: crate::DirectCompletionClient<'static>,
694        tool_call_id: Option<String>,
695    ) -> ToolContext<'static> {
696        ToolContext::builder(
697            session_id,
698            sessions,
699            session_lifecycle,
700            session_graph,
701            processes,
702            process_cancel_ability,
703            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
704                crate::InlineRuntimeEffectController,
705            )),
706            attachment_store,
707            direct_completions,
708        )
709        .tool_call_id(tool_call_id)
710        .build()
711    }
712}
713
714/// Durable effect operations available to advanced in-process tools.
715///
716/// The facade borrows the caller's existing runtime effect boundary. It records
717/// JSON-only local steps and awaits host-signed event keys without exposing
718/// Restate, Temporal, or any other workflow-native context.
719pub struct ToolDurableEffects<'ctx, 'run> {
720    context: &'ctx ToolContext<'run>,
721}
722
723impl<'ctx, 'run> ToolDurableEffects<'ctx, 'run> {
724    pub async fn run_json<F, Fut>(
725        &self,
726        step_id: impl Into<String>,
727        input: serde_json::Value,
728        run: F,
729    ) -> Result<serde_json::Value, crate::RuntimeError>
730    where
731        F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
732        Fut: Future<Output = Result<serde_json::Value, crate::RuntimeError>> + Send + 'run,
733    {
734        let step_id = step_id.into();
735        if step_id.trim().is_empty() {
736            return Err(crate::RuntimeError::new(
737                "durable_effect_empty_step_id",
738                "durable effect step id must be non-empty",
739            ));
740        }
741        self.context.durable_effects.reserve_step(&step_id)?;
742        let invocation = self.step_invocation(
743            format!("durable-step:{step_id}"),
744            crate::RuntimeEffectKind::DurableStep,
745            format!("durable-step:{step_id}"),
746        )?;
747        let outcome = self
748            .context
749            .effect_controller
750            .controller()
751            .execute_effect(
752                crate::RuntimeEffectEnvelope::new(
753                    invocation,
754                    crate::RuntimeEffectCommand::DurableStep {
755                        step_id,
756                        input: input.clone(),
757                    },
758                ),
759                crate::RuntimeEffectLocalExecutor::durable_step(run),
760            )
761            .await
762            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
763        outcome
764            .into_durable_step()
765            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)
766    }
767
768    pub async fn external_event_key(
769        &self,
770        key: impl Into<String>,
771    ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
772        let key = key.into();
773        if key.trim().is_empty() {
774            return Err(crate::RuntimeError::new(
775                "durable_effect_empty_event_key",
776                "durable effect external event key must be non-empty",
777            ));
778        }
779        let scoped = self.context.effect_controller.scoped();
780        scoped
781            .controller()
782            .await_event_key(
783                scoped.execution_scope(),
784                crate::AwaitEventWaitIdentity::Custom { key },
785            )
786            .await
787    }
788
789    pub async fn await_event_json(
790        &self,
791        key: crate::AwaitEventKey,
792    ) -> Result<serde_json::Value, crate::RuntimeError> {
793        let invocation = self.step_invocation(
794            format!("await-event:{}", key.key_id),
795            crate::RuntimeEffectKind::AwaitEvent,
796            format!("await-event:{}", key.key_id),
797        )?;
798        let cancellation = self.context.cancellation_token.clone().unwrap_or_default();
799        let clock = self
800            .context
801            .runtime_dispatch
802            .as_ref()
803            .map(|dispatch| Arc::clone(&dispatch.clock))
804            .unwrap_or_else(|| Arc::new(crate::SystemClock));
805        let outcome = self
806            .context
807            .effect_controller
808            .controller()
809            .execute_effect(
810                crate::RuntimeEffectEnvelope::new(
811                    invocation,
812                    crate::RuntimeEffectCommand::AwaitEvent { key },
813                ),
814                crate::RuntimeEffectLocalExecutor::await_event_with_clock(
815                    cancellation,
816                    None,
817                    clock,
818                ),
819            )
820            .await
821            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
822        match outcome
823            .into_await_event()
824            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?
825        {
826            crate::Resolution::Ok(value) => Ok(value),
827            crate::Resolution::Err(err) => Err(crate::RuntimeError::new(err.code, err.message)),
828            crate::Resolution::Timeout => Err(crate::RuntimeError::new(
829                "durable_effect_event_timeout",
830                "durable effect external event wait timed out",
831            )),
832            crate::Resolution::Cancelled => Err(crate::RuntimeError::new(
833                "durable_effect_event_cancelled",
834                "durable effect external event wait was cancelled",
835            )),
836        }
837    }
838
839    pub async fn emit_process_event(
840        &self,
841        event_type: impl Into<String>,
842        payload: serde_json::Value,
843    ) -> Result<crate::ProcessEvent, crate::RuntimeError> {
844        let Some(process) = self.context.process_events.as_ref() else {
845            return Err(crate::RuntimeError::new(
846                "durable_effect_process_event_unavailable",
847                "durable effect process events are unavailable outside a durable process",
848            ));
849        };
850        let event_type = event_type.into();
851        if event_type.trim().is_empty() {
852            return Err(crate::RuntimeError::new(
853                "durable_effect_empty_process_event_type",
854                "durable effect process event type must be non-empty",
855            ));
856        }
857        let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
858            crate::RuntimeError::new(
859                "durable_effects_missing_call_id",
860                "durable effects require a prepared tool call id",
861            )
862        })?;
863        let sequence = self.context.durable_effects.next_process_event_sequence();
864        let request = crate::ProcessEventAppendRequest::new(event_type, payload).with_replay_key(
865            format!("tool:{tool_call_id}:durable-process-event:{sequence}"),
866        );
867        self.context
868            .process_events()
869            .emit_request(request)
870            .await
871            .map_err(|err| {
872                crate::RuntimeError::new(
873                    "durable_effect_process_event_append_failed",
874                    err.to_string(),
875                )
876            })
877            .and_then(|event| {
878                if event.process_id == process.process_id {
879                    Ok(event)
880                } else {
881                    Err(crate::RuntimeError::new(
882                        "durable_effect_process_event_process_mismatch",
883                        "process event append returned an event for a different process",
884                    ))
885                }
886            })
887    }
888
889    fn step_invocation(
890        &self,
891        effect_id_suffix: impl Into<String>,
892        kind: crate::RuntimeEffectKind,
893        replay_suffix: impl AsRef<str>,
894    ) -> Result<crate::RuntimeInvocation, crate::RuntimeError> {
895        let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
896            crate::RuntimeError::new(
897                "durable_effects_missing_call_id",
898                "durable effects require a prepared tool call id",
899            )
900        })?;
901        let effect_id_suffix = effect_id_suffix.into();
902        if let Some(parent) = self.context.parent_invocation.as_ref() {
903            return Ok(crate::runtime::causal::child_effect_invocation(
904                parent,
905                format!("{tool_call_id}:{effect_id_suffix}"),
906                kind,
907                replay_suffix,
908            ));
909        }
910        let scoped = self.context.effect_controller.scoped();
911        let replay_key = format!(
912            "{}:tool:{tool_call_id}:{}",
913            scoped.scope_id(),
914            replay_suffix.as_ref()
915        );
916        Ok(crate::RuntimeInvocation::effect(
917            crate::RuntimeScope::new(self.context.session_id.clone()),
918            format!("{tool_call_id}:{effect_id_suffix}"),
919            kind,
920            replay_key,
921        ))
922    }
923}
924
925/// Runtime-prepared executable tool call.
926///
927/// The raw model/provider identity remains visible, but any argument rewrites
928/// and provider-owned context projections are frozen before the call crosses a
929/// runtime effect or process boundary.
930#[derive(Clone, Debug, Serialize, Deserialize)]
931pub struct PreparedToolCall {
932    pub call_id: String,
933    pub tool_id: ToolId,
934    pub tool_name: String,
935    pub args: serde_json::Value,
936    #[serde(default, skip_serializing_if = "Option::is_none")]
937    pub replay: Option<ProviderReplayMeta>,
938    #[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
939    pub prepared_payload: serde_json::Value,
940}
941
942impl PreparedToolCall {
943    pub fn identity(tool_id: ToolId, call: crate::sansio::PendingToolCall) -> Self {
944        Self {
945            call_id: call.call_id,
946            tool_id,
947            tool_name: call.tool_name,
948            args: call.args,
949            replay: call.replay,
950            prepared_payload: serde_json::Value::Null,
951        }
952    }
953
954    pub fn from_parts(
955        call_id: impl Into<String>,
956        tool_id: impl Into<ToolId>,
957        tool_name: impl Into<String>,
958        args: serde_json::Value,
959        replay: Option<ProviderReplayMeta>,
960        prepared_payload: serde_json::Value,
961    ) -> Self {
962        Self {
963            call_id: call_id.into(),
964            tool_id: tool_id.into(),
965            tool_name: tool_name.into(),
966            args,
967            replay,
968            prepared_payload,
969        }
970    }
971}
972
973/// One ordered child inside a runtime-prepared tool batch.
974///
975/// The call itself carries the executable provider payload. `replay_suffix`
976/// is the deterministic suffix used for child effects such as retry sleeps or
977/// pending completion awaits when the batch is the durable parent.
978#[derive(Clone, Debug, Serialize, Deserialize)]
979pub struct PreparedToolBatchCall {
980    pub call: PreparedToolCall,
981    pub replay_suffix: String,
982}
983
984/// Runtime-prepared executable tool batch.
985///
986/// The vector order is source order. Scheduling may run parallel-safe tools
987/// concurrently, but launches and pending completion consumption are projected
988/// back through this order.
989#[derive(Clone, Debug, Serialize, Deserialize)]
990pub struct PreparedToolBatch {
991    pub batch_id: String,
992    pub calls: Vec<PreparedToolBatchCall>,
993}
994
995impl PreparedToolBatch {
996    pub fn new(batch_id: impl Into<String>, calls: Vec<PreparedToolCall>) -> Self {
997        let batch_id = batch_id.into();
998        let calls = calls
999            .into_iter()
1000            .enumerate()
1001            .map(|(index, call)| PreparedToolBatchCall {
1002                replay_suffix: format!("child:{index}:{}", call.call_id),
1003                call,
1004            })
1005            .collect();
1006        Self { batch_id, calls }
1007    }
1008
1009    pub fn is_empty(&self) -> bool {
1010        self.calls.is_empty()
1011    }
1012
1013    pub fn len(&self) -> usize {
1014        self.calls.len()
1015    }
1016}
1017
1018#[derive(Clone)]
1019pub struct ToolPrepareContext {
1020    session_id: String,
1021    sessions: Arc<dyn SessionStateService>,
1022    turn_context: crate::TurnContext,
1023    tool_call_id: Option<String>,
1024}
1025
1026impl ToolPrepareContext {
1027    pub(crate) fn new(
1028        session_id: String,
1029        sessions: Arc<dyn SessionStateService>,
1030        turn_context: crate::TurnContext,
1031        tool_call_id: Option<String>,
1032    ) -> Self {
1033        Self {
1034            session_id,
1035            sessions,
1036            turn_context,
1037            tool_call_id,
1038        }
1039    }
1040
1041    pub fn session_id(&self) -> &str {
1042        &self.session_id
1043    }
1044
1045    pub fn tool_call_id(&self) -> Option<&str> {
1046        self.tool_call_id.as_deref()
1047    }
1048
1049    pub fn turn_context(&self) -> &crate::TurnContext {
1050        &self.turn_context
1051    }
1052
1053    pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
1054    where
1055        T: 'static,
1056    {
1057        self.turn_context.plugin_input::<T>(plugin_id)
1058    }
1059
1060    pub async fn session_snapshot(&self) -> Result<SessionSnapshot, PluginError> {
1061        self.sessions.snapshot_session(&self.session_id).await
1062    }
1063
1064    pub async fn tool_catalog(&self) -> Result<Vec<serde_json::Value>, PluginError> {
1065        self.sessions.tool_catalog(&self.session_id).await
1066    }
1067
1068    pub async fn shared_tool_catalog(
1069        &self,
1070    ) -> Result<std::sync::Arc<Vec<serde_json::Value>>, PluginError> {
1071        self.sessions.shared_tool_catalog(&self.session_id).await
1072    }
1073}
1074
1075/// Inputs handed to [`ToolProvider::prepare_tool_call`].
1076pub struct ToolPrepareCall<'a> {
1077    pub tool_id: ToolId,
1078    pub pending: crate::sansio::PendingToolCall,
1079    pub context: &'a ToolPrepareContext,
1080}
1081
1082/// Per-call inputs handed to [`ToolProvider::execute`].
1083///
1084/// Fields are `pub` because `ToolCall` is a transient borrow; consumers
1085/// typically destructure (`let ToolCall { name, args, .. } = call`). The
1086/// stable surface lives on [`ToolContext`] (sealed) and the runtime's
1087/// dispatcher, which constructs `ToolCall` values.
1088pub struct ToolCall<'a> {
1089    pub name: &'a str,
1090    pub args: &'a serde_json::Value,
1091    pub context: &'a ToolContext<'a>,
1092    pub progress: Option<&'a ProgressSender>,
1093}
1094
1095/// Trait for providing tools to the sandbox. Implement this per-project.
1096///
1097/// Implementations supply cheap [`ToolManifest`]s, lazily resolved
1098/// [`ToolContract`]s, and a single
1099/// [`execute`](Self::execute) method that handles every call. Tools that
1100/// need session state read it from `call.context`; tools that stream
1101/// progress send through `call.progress`.
1102#[async_trait::async_trait]
1103pub trait ToolProvider: Send + Sync + 'static {
1104    fn tool_manifests(&self) -> Vec<ToolManifest>;
1105    fn resolve_manifest(&self, name: &str) -> Option<ToolManifest> {
1106        self.tool_manifests()
1107            .into_iter()
1108            .find(|manifest| manifest.name == name)
1109    }
1110    fn resolve_manifest_by_id(&self, id: &ToolId) -> Option<ToolManifest> {
1111        self.tool_manifests()
1112            .into_iter()
1113            .find(|manifest| manifest.id == *id)
1114    }
1115    fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>>;
1116    fn resolve_contract_by_id(&self, id: &ToolId) -> Option<Arc<ToolContract>> {
1117        let manifest = self.resolve_manifest_by_id(id)?;
1118        self.resolve_contract(&manifest.name)
1119    }
1120    async fn prepare_tool_call(
1121        &self,
1122        call: ToolPrepareCall<'_>,
1123    ) -> Result<PreparedToolCall, ToolResult> {
1124        Ok(PreparedToolCall::identity(call.tool_id, call.pending))
1125    }
1126    async fn execute(&self, call: ToolCall<'_>) -> ToolResult;
1127    async fn execute_by_id(
1128        &self,
1129        tool_id: &ToolId,
1130        args: &serde_json::Value,
1131        context: &ToolContext<'_>,
1132        progress: Option<&ProgressSender>,
1133    ) -> ToolResult {
1134        let Some(manifest) = self.resolve_manifest_by_id(tool_id) else {
1135            return ToolResult::err_fmt(format!("Unknown tool id: {tool_id}"));
1136        };
1137        self.execute(ToolCall {
1138            name: &manifest.name,
1139            args,
1140            context,
1141            progress,
1142        })
1143        .await
1144    }
1145}
1146
1147#[cfg(test)]
1148mod tests {
1149    use super::*;
1150    use crate::ProcessRegistry;
1151    use crate::RuntimeEffectController;
1152    use std::sync::atomic::{AtomicU64, Ordering};
1153
1154    struct NoDurableEffectController;
1155
1156    #[async_trait::async_trait]
1157    impl crate::RuntimeEffectController for NoDurableEffectController {
1158        async fn execute_effect(
1159            &self,
1160            _envelope: crate::RuntimeEffectEnvelope,
1161            _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
1162        ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
1163            Err(crate::RuntimeEffectControllerError::new(
1164                "unexpected_effect",
1165                "test controller should not execute effects",
1166            ))
1167        }
1168    }
1169
1170    fn test_context_with_controller(
1171        tool_call_id: Option<String>,
1172        controller: Arc<dyn crate::RuntimeEffectController>,
1173    ) -> ToolContext<'static> {
1174        ToolContext::builder(
1175            "session-1".to_string(),
1176            Arc::new(crate::testing::MockSessionManager::default()),
1177            Arc::new(crate::testing::MockSessionManager::default()),
1178            Arc::new(crate::testing::MockSessionManager::default()),
1179            Arc::new(crate::UnavailableProcessService),
1180            Arc::new(crate::DefaultProcessCancelAbility),
1181            crate::runtime::RuntimeEffectControllerHandle::shared(controller),
1182            Arc::new(crate::InMemoryAttachmentStore::new()),
1183            crate::DirectCompletionClient::unavailable(
1184                "direct completions are unavailable in this test context",
1185            ),
1186        )
1187        .tool_call_id(tool_call_id)
1188        .build()
1189    }
1190
1191    #[test]
1192    fn tool_context_builder_carries_call_payload_and_cancellation_state() {
1193        let cancellation = tokio_util::sync::CancellationToken::new();
1194        let prepared = PreparedToolCall::from_parts(
1195            "call-1",
1196            "tool:demo_tool",
1197            "demo_tool",
1198            serde_json::json!({ "input": true }),
1199            None,
1200            serde_json::json!({ "prepared": true }),
1201        );
1202
1203        let context = ToolContext::builder(
1204            "session-1".to_string(),
1205            Arc::new(crate::testing::MockSessionManager::default()),
1206            Arc::new(crate::testing::MockSessionManager::default()),
1207            Arc::new(crate::testing::MockSessionManager::default()),
1208            Arc::new(crate::UnavailableProcessService),
1209            Arc::new(crate::DefaultProcessCancelAbility),
1210            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1211                crate::InlineRuntimeEffectController,
1212            )),
1213            Arc::new(crate::InMemoryAttachmentStore::new()),
1214            crate::DirectCompletionClient::unavailable(
1215                "direct completions are unavailable in this test context",
1216            ),
1217        )
1218        .prepared_call(&prepared)
1219        .cancellation_token(Some(cancellation.clone()))
1220        .async_process("process-1", cancellation.clone())
1221        .build();
1222
1223        assert_eq!(context.session_id(), "session-1");
1224        assert_eq!(context.tool_call_id(), Some("call-1"));
1225        assert_eq!(
1226            context.prepared_payload(),
1227            &serde_json::json!({ "prepared": true })
1228        );
1229        assert_eq!(context.async_process_id(), Some("process-1"));
1230        assert!(context.cancellation_token().is_some());
1231    }
1232
1233    #[test]
1234    fn durable_effects_requires_prepared_call_id_and_supporting_controller() {
1235        let missing_call =
1236            test_context_with_controller(None, Arc::new(crate::InlineRuntimeEffectController));
1237        let err = match missing_call.durable_effects() {
1238            Ok(_) => panic!("missing prepared tool call id should fail"),
1239            Err(err) => err,
1240        };
1241        assert_eq!(err.code.as_str(), "durable_effects_missing_call_id");
1242
1243        let unsupported = test_context_with_controller(
1244            Some("call-1".to_string()),
1245            Arc::new(NoDurableEffectController),
1246        );
1247        let err = match unsupported.durable_effects() {
1248            Ok(_) => panic!("unsupported controller should fail"),
1249            Err(err) => err,
1250        };
1251        assert_eq!(err.code.as_str(), "durable_effects_unavailable");
1252    }
1253
1254    #[tokio::test]
1255    async fn durable_run_json_executes_and_maps_closure_errors() {
1256        let context = test_context_with_controller(
1257            Some("call-run-json".to_string()),
1258            Arc::new(crate::InlineRuntimeEffectController),
1259        );
1260        let durable = context.durable_effects().expect("durable effects");
1261        let value = durable
1262            .run_json(
1263                "create",
1264                serde_json::json!({ "x": 1 }),
1265                |input| async move { Ok(serde_json::json!({ "seen": input["x"] })) },
1266            )
1267            .await
1268            .expect("durable step");
1269        assert_eq!(value, serde_json::json!({ "seen": 1 }));
1270
1271        let err = durable
1272            .run_json("fail", serde_json::json!({}), |_| async {
1273                Err(crate::RuntimeError::new(
1274                    "durable_step_failed",
1275                    "step failed",
1276                ))
1277            })
1278            .await
1279            .expect_err("closure error");
1280        assert_eq!(err.code.as_str(), "durable_step_failed");
1281        assert_eq!(err.message, "step failed");
1282    }
1283
1284    #[tokio::test]
1285    async fn durable_run_json_rejects_empty_or_duplicate_step_ids_before_running() {
1286        let context = test_context_with_controller(
1287            Some("call-step-ids".to_string()),
1288            Arc::new(crate::InlineRuntimeEffectController),
1289        );
1290        let durable = context.durable_effects().expect("durable effects");
1291        let runs = Arc::new(AtomicU64::new(0));
1292
1293        let err = durable
1294            .run_json("", serde_json::Value::Null, {
1295                let runs = Arc::clone(&runs);
1296                move |_| async move {
1297                    runs.fetch_add(1, Ordering::Relaxed);
1298                    Ok(serde_json::Value::Null)
1299                }
1300            })
1301            .await
1302            .expect_err("empty step id");
1303        assert_eq!(err.code.as_str(), "durable_effect_empty_step_id");
1304        assert_eq!(runs.load(Ordering::Relaxed), 0);
1305
1306        durable
1307            .run_json("same", serde_json::Value::Null, {
1308                let runs = Arc::clone(&runs);
1309                move |_| async move {
1310                    runs.fetch_add(1, Ordering::Relaxed);
1311                    Ok(serde_json::Value::Null)
1312                }
1313            })
1314            .await
1315            .expect("first step");
1316        let err = durable
1317            .run_json("same", serde_json::Value::Null, {
1318                let runs = Arc::clone(&runs);
1319                move |_| async move {
1320                    runs.fetch_add(1, Ordering::Relaxed);
1321                    Ok(serde_json::Value::Null)
1322                }
1323            })
1324            .await
1325            .expect_err("duplicate step id");
1326        assert_eq!(err.code.as_str(), "durable_effect_duplicate_step_id");
1327        assert_eq!(runs.load(Ordering::Relaxed), 1);
1328    }
1329
1330    #[tokio::test]
1331    async fn durable_external_event_key_is_custom_and_stable() {
1332        let context = test_context_with_controller(
1333            Some("call-event-key".to_string()),
1334            Arc::new(crate::InlineRuntimeEffectController),
1335        );
1336        let durable = context.durable_effects().expect("durable effects");
1337        let first = durable
1338            .external_event_key("tool-event-stable")
1339            .await
1340            .expect("first key");
1341        let second = durable
1342            .external_event_key("tool-event-stable")
1343            .await
1344            .expect("second key");
1345
1346        assert_eq!(first, second);
1347        assert_eq!(
1348            first.wait,
1349            crate::AwaitEventWaitIdentity::Custom {
1350                key: "tool-event-stable".to_string()
1351            }
1352        );
1353    }
1354
1355    #[tokio::test]
1356    async fn durable_await_event_json_maps_terminal_resolutions() {
1357        let controller = Arc::new(crate::InlineRuntimeEffectController);
1358        let context =
1359            test_context_with_controller(Some("call-await-event".to_string()), controller.clone());
1360        let durable = context.durable_effects().expect("durable effects");
1361
1362        let ok_key = durable
1363            .external_event_key("tool-event-ok")
1364            .await
1365            .expect("ok key");
1366        controller
1367            .resolve_await_event(
1368                &ok_key,
1369                crate::Resolution::Ok(serde_json::json!({ "answer": 42 })),
1370            )
1371            .await
1372            .expect("resolve ok");
1373        let value = durable
1374            .await_event_json(ok_key)
1375            .await
1376            .expect("await ok value");
1377        assert_eq!(value, serde_json::json!({ "answer": 42 }));
1378
1379        let err_key = durable
1380            .external_event_key("tool-event-err")
1381            .await
1382            .expect("err key");
1383        controller
1384            .resolve_await_event(
1385                &err_key,
1386                crate::Resolution::Err(crate::ExternalCompletionError::new(
1387                    "external_bad",
1388                    "external failed",
1389                )),
1390            )
1391            .await
1392            .expect("resolve err");
1393        let err = durable
1394            .await_event_json(err_key)
1395            .await
1396            .expect_err("await err value");
1397        assert_eq!(err.code.as_str(), "external_bad");
1398
1399        let cancelled_key = durable
1400            .external_event_key("tool-event-cancelled")
1401            .await
1402            .expect("cancelled key");
1403        controller
1404            .resolve_await_event(&cancelled_key, crate::Resolution::Cancelled)
1405            .await
1406            .expect("resolve cancelled");
1407        let err = durable
1408            .await_event_json(cancelled_key)
1409            .await
1410            .expect_err("await cancelled value");
1411        assert_eq!(err.code.as_str(), "durable_effect_event_cancelled");
1412
1413        let timeout_key = durable
1414            .external_event_key("tool-event-timeout")
1415            .await
1416            .expect("timeout key");
1417        controller
1418            .resolve_await_event(&timeout_key, crate::Resolution::Timeout)
1419            .await
1420            .expect("resolve timeout");
1421        let err = durable
1422            .await_event_json(timeout_key)
1423            .await
1424            .expect_err("await timeout value");
1425        assert_eq!(err.code.as_str(), "durable_effect_event_timeout");
1426    }
1427
1428    #[tokio::test]
1429    async fn durable_emit_process_event_requires_process_and_appends_inside_process() {
1430        let context = test_context_with_controller(
1431            Some("call-no-process".to_string()),
1432            Arc::new(crate::InlineRuntimeEffectController),
1433        );
1434        let err = context
1435            .durable_effects()
1436            .expect("durable effects")
1437            .emit_process_event("tool.event", serde_json::json!({}))
1438            .await
1439            .expect_err("outside process");
1440        assert_eq!(
1441            err.code.as_str(),
1442            "durable_effect_process_event_unavailable"
1443        );
1444
1445        let registry = Arc::new(crate::TestLocalProcessRegistry::default());
1446        let process_id = "process:durable-tool-event";
1447        registry
1448            .register_process(
1449                crate::ProcessRegistration::new(
1450                    process_id,
1451                    crate::ProcessInput::External {
1452                        metadata: serde_json::json!({}),
1453                    },
1454                    crate::ProcessProvenance::host(),
1455                )
1456                .with_extra_event_types([crate::ProcessEventType {
1457                    name: "tool.event".to_string(),
1458                    payload_schema: crate::LashSchema::any(),
1459                    semantics: crate::ProcessEventSemanticsSpec::default(),
1460                }]),
1461            )
1462            .await
1463            .expect("register process");
1464        let registry_dyn: Arc<dyn crate::ProcessRegistry> = registry;
1465        let context = test_context_with_controller(
1466            Some("call-process-event".to_string()),
1467            Arc::new(crate::InlineRuntimeEffectController),
1468        )
1469        .with_process_events_for_testing(process_id, registry_dyn);
1470
1471        let event = context
1472            .durable_effects()
1473            .expect("durable effects")
1474            .emit_process_event("tool.event", serde_json::json!({ "ok": true }))
1475            .await
1476            .expect("process event");
1477        assert_eq!(event.process_id, process_id);
1478        assert_eq!(event.event_type, "tool.event");
1479        assert_eq!(event.payload, serde_json::json!({ "ok": true }));
1480        assert_eq!(
1481            event.invocation.replay_key(),
1482            Some("tool:call-process-event:durable-process-event:0")
1483        );
1484
1485        let append_err = context
1486            .durable_effects()
1487            .expect("durable effects")
1488            .emit_process_event("undeclared.event", serde_json::json!({}))
1489            .await
1490            .expect_err("undeclared event type must fail the append");
1491        assert_eq!(
1492            append_err.code.as_str(),
1493            "durable_effect_process_event_append_failed"
1494        );
1495    }
1496}