Skip to main content

lash_core/
tool_provider.rs

1use std::collections::HashSet;
2use std::future::Future;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use lash_sansio::llm::types::ProviderReplayMeta;
7use serde::{Deserialize, Serialize};
8
9use crate::plugin::{
10    PluginError, SessionGraphService, SessionLifecycleService, SessionSnapshot, SessionStateService,
11};
12use crate::{AttachmentStore, ToolContract, ToolId, ToolManifest, ToolResult};
13
14mod attachments;
15mod direct_completion;
16mod dispatch;
17mod process;
18pub(crate) mod process_events;
19mod session;
20mod triggers;
21
22pub use attachments::ToolAttachmentClient;
23pub use direct_completion::ToolDirectCompletionClient;
24pub use dispatch::ToolDispatchClient;
25pub use process::ToolSessionProcessAdmin;
26pub use process_events::ToolProcessEventClient;
27pub use session::{ToolSessionAdmin, ToolSessionModel};
28pub use triggers::ToolTriggerClient;
29
30/// A message sent from the sandbox to the host during execution.
31#[derive(Clone, Debug)]
32pub struct SandboxMessage {
33    pub text: String,
34    /// "tool_output" or another host-rendered progress event kind.
35    pub kind: String,
36}
37
38/// Sender for streaming progress messages from tools (e.g. live bash output).
39pub type ProgressSender = tokio::sync::mpsc::UnboundedSender<SandboxMessage>;
40
41#[derive(Clone, Default)]
42pub(crate) struct ToolCompletionState {
43    key: Arc<Mutex<Option<crate::AwaitEventKey>>>,
44}
45
46impl ToolCompletionState {
47    fn store(
48        &self,
49        key: crate::AwaitEventKey,
50    ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
51        let mut guard = self.key.lock().map_err(|_| {
52            crate::RuntimeError::new(
53                "tool_completion_state_poisoned",
54                "tool completion key state lock poisoned",
55            )
56        })?;
57        if let Some(existing) = guard.as_ref() {
58            return Ok(existing.clone());
59        }
60        *guard = Some(key.clone());
61        Ok(key)
62    }
63
64    pub(crate) fn take(&self) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
65        self.key.lock().map(|mut guard| guard.take()).map_err(|_| {
66            crate::RuntimeError::new(
67                "tool_completion_state_poisoned",
68                "tool completion key state lock poisoned",
69            )
70        })
71    }
72}
73
74#[derive(Clone, Default)]
75pub(crate) struct ToolDurableEffectState {
76    step_ids: Arc<Mutex<HashSet<String>>>,
77    process_event_sequence: Arc<AtomicU64>,
78}
79
80impl ToolDurableEffectState {
81    fn reserve_step(&self, step_id: &str) -> Result<(), crate::RuntimeError> {
82        let mut guard = self.step_ids.lock().map_err(|_| {
83            crate::RuntimeError::new(
84                "durable_effect_state_poisoned",
85                "durable effect step state lock poisoned",
86            )
87        })?;
88        if !guard.insert(step_id.to_string()) {
89            return Err(crate::RuntimeError::new(
90                "durable_effect_duplicate_step_id",
91                format!("durable effect step id `{step_id}` was already used by this tool call"),
92            ));
93        }
94        Ok(())
95    }
96
97    fn next_process_event_sequence(&self) -> u64 {
98        self.process_event_sequence.fetch_add(1, Ordering::Relaxed)
99    }
100}
101
102/// Per-call environment for [`ToolProvider::execute`]. Fields are sealed so
103/// the runtime can add capabilities without breaking tool authors.
104#[derive(Clone)]
105pub struct ToolContext<'run> {
106    pub(crate) session_id: String,
107    pub(crate) agent_frame_id: crate::AgentFrameId,
108    pub(crate) sessions: Arc<dyn SessionStateService>,
109    pub(crate) session_lifecycle: Arc<dyn SessionLifecycleService>,
110    pub(crate) processes: Arc<dyn crate::ProcessService>,
111    pub(crate) process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
112    pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
113    pub(crate) runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
114    pub(crate) runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
115    pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
116    pub(crate) async_process_id: Option<String>,
117    pub(crate) runtime_process_id: Option<String>,
118    pub(crate) process_events: Option<ToolProcessEventContext>,
119    pub(crate) attachment_store: Arc<dyn AttachmentStore>,
120    pub(crate) direct_completions: crate::DirectCompletionClient<'run>,
121    pub(crate) prepared_payload: serde_json::Value,
122    /// The id of the in-flight tool call that is invoking this tool.
123    pub(crate) tool_call_id: Option<String>,
124    pub(crate) attempt_number: u32,
125    pub(crate) max_attempts: u32,
126    pub(crate) replay_key: Option<String>,
127    pub(crate) completion: ToolCompletionState,
128    pub(crate) durable_effects: ToolDurableEffectState,
129    pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
130    pub(crate) execution_env_spec: crate::ProcessExecutionEnvSpec,
131    pub(crate) child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
132}
133
134#[derive(Clone)]
135pub struct ToolChildProcessStarted {
136    pub process_id: String,
137    pub child_entry_name: Option<String>,
138}
139
140#[derive(Clone)]
141pub struct ToolChildExecutionTraceHook {
142    on_child_process_started: Arc<dyn Fn(ToolChildProcessStarted) + Send + Sync>,
143}
144
145impl ToolChildExecutionTraceHook {
146    pub fn new(
147        on_child_process_started: impl Fn(ToolChildProcessStarted) + Send + Sync + 'static,
148    ) -> Self {
149        Self {
150            on_child_process_started: Arc::new(on_child_process_started),
151        }
152    }
153
154    pub fn child_process_started(&self, event: ToolChildProcessStarted) {
155        (self.on_child_process_started)(event);
156    }
157}
158
159#[derive(Clone)]
160pub(crate) struct ToolProcessEventContext {
161    process_id: String,
162    registry: Arc<dyn crate::ProcessRegistry>,
163    store: Option<Arc<dyn crate::RuntimePersistence>>,
164    session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
165    session_graph: Arc<dyn SessionGraphService>,
166    queued_work_driver: Option<crate::QueuedWorkDriver>,
167}
168
169pub(crate) struct ToolContextBuilder<'run> {
170    session_id: String,
171    agent_frame_id: crate::AgentFrameId,
172    sessions: Arc<dyn SessionStateService>,
173    session_lifecycle: Arc<dyn SessionLifecycleService>,
174    session_graph: Arc<dyn SessionGraphService>,
175    processes: Arc<dyn crate::ProcessService>,
176    process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
177    effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
178    runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
179    runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
180    cancellation_token: Option<tokio_util::sync::CancellationToken>,
181    async_process_id: Option<String>,
182    runtime_process_id: Option<String>,
183    process_events: Option<ToolProcessEventContext>,
184    attachment_store: Arc<dyn AttachmentStore>,
185    direct_completions: crate::DirectCompletionClient<'run>,
186    prepared_payload: serde_json::Value,
187    tool_call_id: Option<String>,
188    completion: ToolCompletionState,
189    durable_effects: ToolDurableEffectState,
190    parent_invocation: Option<crate::RuntimeInvocation>,
191    execution_env_spec: crate::ProcessExecutionEnvSpec,
192    child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
193}
194
195impl<'run> ToolContextBuilder<'run> {
196    pub(crate) fn from_dispatch(
197        dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
198    ) -> Self {
199        Self {
200            session_id: dispatch.session_id.clone(),
201            agent_frame_id: dispatch.agent_frame_id.clone(),
202            sessions: Arc::clone(&dispatch.sessions),
203            session_lifecycle: Arc::clone(&dispatch.session_lifecycle),
204            session_graph: Arc::clone(&dispatch.session_graph),
205            processes: Arc::clone(&dispatch.processes),
206            process_cancel_ability: Arc::clone(&dispatch.process_cancel_ability),
207            effect_controller: dispatch.effect_controller.clone(),
208            runtime_dispatch: Some(Arc::clone(&dispatch)),
209            runtime_execution_context: None,
210            cancellation_token: None,
211            async_process_id: None,
212            runtime_process_id: None,
213            process_events: None,
214            attachment_store: Arc::clone(&dispatch.attachment_store),
215            direct_completions: dispatch.direct_completions.clone(),
216            prepared_payload: serde_json::Value::Null,
217            tool_call_id: None,
218            completion: ToolCompletionState::default(),
219            durable_effects: ToolDurableEffectState::default(),
220            parent_invocation: dispatch.parent_invocation.clone(),
221            execution_env_spec: dispatch.execution_env_spec.clone(),
222            child_execution_trace_hook: None,
223        }
224    }
225
226    #[cfg(any(test, feature = "testing"))]
227    pub(crate) fn tool_call_id(mut self, tool_call_id: impl Into<Option<String>>) -> Self {
228        self.tool_call_id = tool_call_id.into();
229        self
230    }
231
232    pub(crate) fn prepared_call(mut self, call: &PreparedToolCall) -> Self {
233        self.tool_call_id = Some(call.call_id.clone());
234        self.prepared_payload = call.prepared_payload.clone();
235        self
236    }
237
238    pub(crate) fn cancellation_token(
239        mut self,
240        cancellation_token: Option<tokio_util::sync::CancellationToken>,
241    ) -> Self {
242        self.cancellation_token = cancellation_token;
243        self
244    }
245
246    pub(crate) fn runtime_execution_context(
247        mut self,
248        context: crate::RuntimeExecutionContext<'run>,
249    ) -> Self {
250        self.runtime_execution_context = Some(context);
251        self
252    }
253
254    pub(crate) fn runtime_process_id(mut self, process_id: Option<String>) -> Self {
255        self.runtime_process_id = process_id;
256        self
257    }
258
259    pub(crate) fn async_process(
260        mut self,
261        process_id: impl Into<String>,
262        cancellation_token: tokio_util::sync::CancellationToken,
263    ) -> Self {
264        self.async_process_id = Some(process_id.into());
265        self.cancellation_token = Some(cancellation_token);
266        self
267    }
268
269    pub(crate) fn process_events(
270        mut self,
271        process_id: impl Into<String>,
272        registry: Arc<dyn crate::ProcessRegistry>,
273        store: Option<Arc<dyn crate::RuntimePersistence>>,
274        session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
275        queued_work_driver: Option<crate::QueuedWorkDriver>,
276    ) -> Self {
277        self.process_events = Some(ToolProcessEventContext {
278            process_id: process_id.into(),
279            registry,
280            store,
281            session_store_factory,
282            session_graph: Arc::clone(&self.session_graph),
283            queued_work_driver,
284        });
285        self
286    }
287
288    pub(crate) fn parent_invocation(mut self, metadata: Option<crate::RuntimeInvocation>) -> Self {
289        self.parent_invocation = metadata;
290        self
291    }
292
293    pub(crate) fn child_execution_trace_hook(
294        mut self,
295        hook: Option<ToolChildExecutionTraceHook>,
296    ) -> Self {
297        self.child_execution_trace_hook = hook;
298        self
299    }
300
301    pub(crate) fn build(self) -> ToolContext<'run> {
302        ToolContext {
303            session_id: self.session_id,
304            agent_frame_id: self.agent_frame_id,
305            sessions: self.sessions,
306            session_lifecycle: self.session_lifecycle,
307            processes: self.processes,
308            process_cancel_ability: self.process_cancel_ability,
309            effect_controller: self.effect_controller,
310            runtime_dispatch: self.runtime_dispatch,
311            runtime_execution_context: self.runtime_execution_context,
312            cancellation_token: self.cancellation_token,
313            async_process_id: self.async_process_id,
314            runtime_process_id: self.runtime_process_id,
315            process_events: self.process_events,
316            attachment_store: self.attachment_store,
317            direct_completions: self.direct_completions,
318            prepared_payload: self.prepared_payload,
319            tool_call_id: self.tool_call_id,
320            attempt_number: 1,
321            max_attempts: 1,
322            replay_key: None,
323            completion: self.completion,
324            durable_effects: self.durable_effects,
325            parent_invocation: self.parent_invocation,
326            execution_env_spec: self.execution_env_spec,
327            child_execution_trace_hook: self.child_execution_trace_hook,
328        }
329    }
330}
331
332impl<'run> ToolContext<'run> {
333    #[cfg(any(test, feature = "testing"))]
334    #[expect(
335        clippy::too_many_arguments,
336        reason = "testing constructor mirrors the sealed runtime tool context dependencies"
337    )]
338    pub(crate) fn builder(
339        session_id: String,
340        sessions: Arc<dyn SessionStateService>,
341        session_lifecycle: Arc<dyn SessionLifecycleService>,
342        session_graph: Arc<dyn SessionGraphService>,
343        processes: Arc<dyn crate::ProcessService>,
344        process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
345        effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
346        attachment_store: Arc<dyn AttachmentStore>,
347        direct_completions: crate::DirectCompletionClient<'run>,
348    ) -> ToolContextBuilder<'run> {
349        ToolContextBuilder {
350            session_id,
351            agent_frame_id: String::new(),
352            sessions,
353            session_lifecycle,
354            session_graph,
355            processes,
356            process_cancel_ability,
357            effect_controller,
358            runtime_dispatch: None,
359            runtime_execution_context: None,
360            cancellation_token: None,
361            async_process_id: None,
362            runtime_process_id: None,
363            process_events: None,
364            attachment_store,
365            direct_completions,
366            prepared_payload: serde_json::Value::Null,
367            tool_call_id: None,
368            completion: ToolCompletionState::default(),
369            durable_effects: ToolDurableEffectState::default(),
370            parent_invocation: None,
371            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
372                crate::PluginOptions::default(),
373                crate::SessionPolicy::default(),
374            ),
375            child_execution_trace_hook: None,
376        }
377    }
378
379    pub(crate) fn from_dispatch(
380        dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
381    ) -> ToolContextBuilder<'run> {
382        ToolContextBuilder::from_dispatch(dispatch)
383    }
384
385    pub fn session_id(&self) -> &str {
386        &self.session_id
387    }
388
389    pub fn agent_frame_id(&self) -> &str {
390        &self.agent_frame_id
391    }
392
393    pub fn sessions(&self) -> ToolSessionAdmin<'run> {
394        ToolSessionAdmin {
395            session_id: self.session_id.clone(),
396            sessions: Arc::clone(&self.sessions),
397            session_lifecycle: Arc::clone(&self.session_lifecycle),
398            effect_controller: self.effect_controller.clone(),
399        }
400    }
401
402    pub fn dispatch(&self) -> ToolDispatchClient<'run> {
403        ToolDispatchClient {
404            context: self.clone(),
405        }
406    }
407
408    pub fn triggers(&self) -> ToolTriggerClient<'run> {
409        ToolTriggerClient {
410            context: self.clone(),
411        }
412    }
413
414    pub fn processes(&self) -> ToolSessionProcessAdmin<'run> {
415        ToolSessionProcessAdmin {
416            session_id: self.session_id.clone(),
417            agent_frame_id: self.agent_frame_id.clone(),
418            processes: Arc::clone(&self.processes),
419            process_cancel_ability: Arc::clone(&self.process_cancel_ability),
420            effect_controller: self.effect_controller.clone(),
421            parent_invocation: self.parent_invocation.clone(),
422            tool_call_id: self.tool_call_id.clone(),
423            execution_env_spec: self.execution_env_spec.clone(),
424        }
425    }
426
427    pub fn emit_child_process_started(
428        &self,
429        process_id: impl Into<String>,
430        child_entry_name: Option<String>,
431    ) {
432        let Some(hook) = &self.child_execution_trace_hook else {
433            return;
434        };
435        hook.child_process_started(ToolChildProcessStarted {
436            process_id: process_id.into(),
437            child_entry_name,
438        });
439    }
440
441    pub fn direct_completions(&self) -> ToolDirectCompletionClient<'run> {
442        ToolDirectCompletionClient {
443            session_id: self.session_id.clone(),
444            tool_call_id: self.tool_call_id.clone(),
445            direct_completions: self.direct_completions.clone(),
446            parent_invocation: self.parent_invocation.clone(),
447            parent_tool_batch_is_durable: self.parent_invocation.as_ref().is_some_and(
448                |invocation| invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolBatch),
449            ) && self
450                .effect_controller
451                .controller()
452                .durability_tier()
453                == crate::DurabilityTier::Durable,
454        }
455    }
456
457    pub fn attachments(&self) -> ToolAttachmentClient {
458        ToolAttachmentClient {
459            store: Arc::clone(&self.attachment_store),
460        }
461    }
462
463    pub fn process_events(&self) -> ToolProcessEventClient {
464        ToolProcessEventClient {
465            context: self.process_events.clone(),
466        }
467    }
468
469    /// Borrow this tool call's durable effect boundary.
470    ///
471    /// This is available only while executing a prepared tool call under a
472    /// controller that explicitly supports durable tool effects. The returned
473    /// facade records JSON steps and await-event waits in the caller's existing
474    /// effect log; it does not expose the underlying workflow engine context.
475    pub fn durable_effects(&self) -> Result<ToolDurableEffects<'_, 'run>, crate::RuntimeError> {
476        let Some(tool_call_id) = self.tool_call_id.as_deref() else {
477            return Err(crate::RuntimeError::new(
478                "durable_effects_missing_call_id",
479                "durable effects require a prepared tool call id",
480            ));
481        };
482        if tool_call_id.trim().is_empty() {
483            return Err(crate::RuntimeError::new(
484                "durable_effects_missing_call_id",
485                "durable effects require a non-empty prepared tool call id",
486            ));
487        }
488        let scoped = self.effect_controller.scoped();
489        if !scoped.controller().supports_durable_effects() {
490            return Err(crate::RuntimeError::new(
491                "durable_effects_unavailable",
492                "this effect controller does not support durable tool effects",
493            ));
494        }
495        if self.parent_invocation.as_ref().is_some_and(|invocation| {
496            invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolBatch)
497        }) && scoped.controller().durability_tier() == crate::DurabilityTier::Durable
498        {
499            return Err(crate::RuntimeError::new(
500                "durable_effects_unavailable_in_tool_batch",
501                "durable tool sub-effects are not available inside a tool batch child",
502            ));
503        }
504        Ok(ToolDurableEffects { context: self })
505    }
506
507    pub fn cancellation_token(&self) -> Option<&tokio_util::sync::CancellationToken> {
508        self.cancellation_token.as_ref()
509    }
510
511    pub fn async_process_id(&self) -> Option<&str> {
512        self.async_process_id.as_deref()
513    }
514
515    pub fn runtime_process_id(&self) -> Option<&str> {
516        self.async_process_id
517            .as_deref()
518            .or(self.runtime_process_id.as_deref())
519            .or_else(|| {
520                self.process_events
521                    .as_ref()
522                    .map(|context| context.process_id.as_str())
523            })
524    }
525
526    pub fn tool_call_id(&self) -> Option<&str> {
527        self.tool_call_id.as_deref()
528    }
529
530    pub fn prepared_payload(&self) -> &serde_json::Value {
531        &self.prepared_payload
532    }
533
534    pub fn decode_prepared_payload<T>(&self) -> Result<T, serde_json::Error>
535    where
536        T: serde::de::DeserializeOwned,
537    {
538        serde_json::from_value(self.prepared_payload.clone())
539    }
540
541    pub fn attempt_number(&self) -> u32 {
542        self.attempt_number
543    }
544
545    pub fn max_attempts(&self) -> u32 {
546        self.max_attempts
547    }
548
549    pub fn replay_key(&self) -> Option<&str> {
550        self.replay_key.as_deref()
551    }
552
553    /// Obtain the durable completion key for this call, required before returning
554    /// [`ToolResult::Pending`](crate::ToolResult::Pending).
555    ///
556    /// A tool that defers its outcome (waiting on a webhook, human approval, or another
557    /// service) calls this, hands the returned [`AwaitEventKey`](crate::AwaitEventKey)
558    /// to whatever will complete the work out-of-band, and then returns
559    /// `ToolResult::Pending(..)`. The key names the durable wait the runtime parks the
560    /// call on; the external resolver delivers the result against it later.
561    ///
562    /// The key is stored on the context and consumed by the dispatcher when the tool
563    /// returns `Pending`. Returning `Pending` without first calling this fails the call
564    /// with `pending_tool_missing_completion_key`. Calls made outside a prepared tool
565    /// invocation (no tool call id) fail with `tool_completion_key_missing_call_id`.
566    pub async fn completion_key(&self) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
567        let tool_call_id = self.tool_call_id.clone().ok_or_else(|| {
568            crate::RuntimeError::new(
569                "tool_completion_key_missing_call_id",
570                "completion keys require a prepared tool call id",
571            )
572        })?;
573        let scoped = self.effect_controller.scoped();
574        let key = scoped
575            .controller()
576            .await_event_key(
577                scoped.execution_scope(),
578                crate::AwaitEventWaitIdentity::tool_completion(tool_call_id),
579            )
580            .await?;
581        self.completion.store(key)
582    }
583
584    pub(crate) fn take_completion_key(
585        &self,
586    ) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
587        self.completion.take()
588    }
589
590    pub fn with_async_process(
591        mut self,
592        process_id: impl Into<String>,
593        cancellation_token: tokio_util::sync::CancellationToken,
594    ) -> Self {
595        self.async_process_id = Some(process_id.into());
596        self.runtime_process_id = self.async_process_id.clone();
597        self.cancellation_token = Some(cancellation_token);
598        self
599    }
600
601    #[cfg(any(test, feature = "testing"))]
602    #[doc(hidden)]
603    pub fn with_process_events_for_testing(
604        mut self,
605        process_id: impl Into<String>,
606        registry: Arc<dyn crate::ProcessRegistry>,
607    ) -> Self {
608        self.process_events = Some(ToolProcessEventContext {
609            process_id: process_id.into(),
610            registry,
611            store: None,
612            session_store_factory: None,
613            session_graph: Arc::new(crate::plugin::NoopSessionManager),
614            queued_work_driver: None,
615        });
616        self
617    }
618
619    pub(crate) fn with_retry_context(
620        mut self,
621        tool_name: &str,
622        attempt_number: u32,
623        max_attempts: u32,
624    ) -> Self {
625        self.attempt_number = attempt_number.max(1);
626        self.max_attempts = max_attempts.max(1);
627        self.replay_key = self
628            .tool_call_id
629            .as_ref()
630            .map(|call_id| format!("lash-tool:{}:{call_id}:{tool_name}", self.session_id));
631        self
632    }
633
634    pub(crate) fn with_prepared_payload(mut self, payload: serde_json::Value) -> Self {
635        self.prepared_payload = payload;
636        self
637    }
638
639    /// Constructor reserved for `lash_core::testing` helpers. Do not call directly;
640    /// use [`lash_core::testing::mock_tool_context`] instead.
641    #[cfg(any(test, feature = "testing"))]
642    #[doc(hidden)]
643    #[expect(
644        clippy::too_many_arguments,
645        reason = "test-only constructor mirrors the sealed runtime tool context"
646    )]
647    pub fn __for_testing(
648        session_id: String,
649        sessions: Arc<dyn SessionStateService>,
650        session_lifecycle: Arc<dyn SessionLifecycleService>,
651        session_graph: Arc<dyn SessionGraphService>,
652        processes: Arc<dyn crate::ProcessService>,
653        attachment_store: Arc<dyn AttachmentStore>,
654        direct_completions: crate::DirectCompletionClient<'static>,
655        tool_call_id: Option<String>,
656    ) -> ToolContext<'static> {
657        ToolContext::builder(
658            session_id,
659            sessions,
660            session_lifecycle,
661            session_graph,
662            processes,
663            Arc::new(crate::DefaultProcessCancelAbility),
664            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
665                crate::InlineRuntimeEffectController,
666            )),
667            attachment_store,
668            direct_completions,
669        )
670        .tool_call_id(tool_call_id)
671        .build()
672    }
673
674    /// Constructor reserved for tests that need a custom process-cancel host
675    /// ability. Do not call directly; prefer public testing helpers when they
676    /// cover the case.
677    #[cfg(any(test, feature = "testing"))]
678    #[doc(hidden)]
679    #[expect(
680        clippy::too_many_arguments,
681        reason = "test-only constructor mirrors the sealed runtime context"
682    )]
683    pub fn __for_testing_with_process_cancel_ability(
684        session_id: String,
685        sessions: Arc<dyn SessionStateService>,
686        session_lifecycle: Arc<dyn SessionLifecycleService>,
687        session_graph: Arc<dyn SessionGraphService>,
688        processes: Arc<dyn crate::ProcessService>,
689        process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
690        attachment_store: Arc<dyn AttachmentStore>,
691        direct_completions: crate::DirectCompletionClient<'static>,
692        tool_call_id: Option<String>,
693    ) -> ToolContext<'static> {
694        ToolContext::builder(
695            session_id,
696            sessions,
697            session_lifecycle,
698            session_graph,
699            processes,
700            process_cancel_ability,
701            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
702                crate::InlineRuntimeEffectController,
703            )),
704            attachment_store,
705            direct_completions,
706        )
707        .tool_call_id(tool_call_id)
708        .build()
709    }
710}
711
712/// Durable effect operations available to advanced in-process tools.
713///
714/// The facade borrows the caller's existing runtime effect boundary. It records
715/// JSON-only local steps and awaits host-signed event keys without exposing
716/// Restate, Temporal, or any other workflow-native context.
717pub struct ToolDurableEffects<'ctx, 'run> {
718    context: &'ctx ToolContext<'run>,
719}
720
721impl<'ctx, 'run> ToolDurableEffects<'ctx, 'run> {
722    pub async fn run_json<F, Fut>(
723        &self,
724        step_id: impl Into<String>,
725        input: serde_json::Value,
726        run: F,
727    ) -> Result<serde_json::Value, crate::RuntimeError>
728    where
729        F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
730        Fut: Future<Output = Result<serde_json::Value, crate::RuntimeError>> + Send + 'run,
731    {
732        let step_id = step_id.into();
733        if step_id.trim().is_empty() {
734            return Err(crate::RuntimeError::new(
735                "durable_effect_empty_step_id",
736                "durable effect step id must be non-empty",
737            ));
738        }
739        self.context.durable_effects.reserve_step(&step_id)?;
740        let invocation = self.step_invocation(
741            format!("durable-step:{step_id}"),
742            crate::RuntimeEffectKind::DurableStep,
743            format!("durable-step:{step_id}"),
744        )?;
745        let outcome = self
746            .context
747            .effect_controller
748            .controller()
749            .execute_effect(
750                crate::RuntimeEffectEnvelope::new(
751                    invocation,
752                    crate::RuntimeEffectCommand::DurableStep {
753                        step_id,
754                        input: input.clone(),
755                    },
756                ),
757                crate::RuntimeEffectLocalExecutor::durable_step(run),
758            )
759            .await
760            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
761        outcome
762            .into_durable_step()
763            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)
764    }
765
766    pub async fn external_event_key(
767        &self,
768        key: impl Into<String>,
769    ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
770        let key = key.into();
771        if key.trim().is_empty() {
772            return Err(crate::RuntimeError::new(
773                "durable_effect_empty_event_key",
774                "durable effect external event key must be non-empty",
775            ));
776        }
777        let scoped = self.context.effect_controller.scoped();
778        scoped
779            .controller()
780            .await_event_key(
781                scoped.execution_scope(),
782                crate::AwaitEventWaitIdentity::Custom { key },
783            )
784            .await
785    }
786
787    pub async fn await_event_json(
788        &self,
789        key: crate::AwaitEventKey,
790    ) -> Result<serde_json::Value, crate::RuntimeError> {
791        let invocation = self.step_invocation(
792            format!("await-event:{}", key.key_id),
793            crate::RuntimeEffectKind::AwaitEvent,
794            format!("await-event:{}", key.key_id),
795        )?;
796        let cancellation = self.context.cancellation_token.clone().unwrap_or_default();
797        let clock = self
798            .context
799            .runtime_dispatch
800            .as_ref()
801            .map(|dispatch| Arc::clone(&dispatch.clock))
802            .unwrap_or_else(|| Arc::new(crate::SystemClock));
803        let outcome = self
804            .context
805            .effect_controller
806            .controller()
807            .execute_effect(
808                crate::RuntimeEffectEnvelope::new(
809                    invocation,
810                    crate::RuntimeEffectCommand::AwaitEvent { key },
811                ),
812                crate::RuntimeEffectLocalExecutor::await_event_with_clock(
813                    cancellation,
814                    None,
815                    clock,
816                ),
817            )
818            .await
819            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
820        match outcome
821            .into_await_event()
822            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?
823        {
824            crate::Resolution::Ok(value) => Ok(value),
825            crate::Resolution::Err(err) => Err(crate::RuntimeError::new(err.code, err.message)),
826            crate::Resolution::Timeout => Err(crate::RuntimeError::new(
827                "durable_effect_event_timeout",
828                "durable effect external event wait timed out",
829            )),
830            crate::Resolution::Cancelled => Err(crate::RuntimeError::new(
831                "durable_effect_event_cancelled",
832                "durable effect external event wait was cancelled",
833            )),
834        }
835    }
836
837    pub async fn emit_process_event(
838        &self,
839        event_type: impl Into<String>,
840        payload: serde_json::Value,
841    ) -> Result<crate::ProcessEvent, crate::RuntimeError> {
842        let Some(process) = self.context.process_events.as_ref() else {
843            return Err(crate::RuntimeError::new(
844                "durable_effect_process_event_unavailable",
845                "durable effect process events are unavailable outside a durable process",
846            ));
847        };
848        let event_type = event_type.into();
849        if event_type.trim().is_empty() {
850            return Err(crate::RuntimeError::new(
851                "durable_effect_empty_process_event_type",
852                "durable effect process event type must be non-empty",
853            ));
854        }
855        let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
856            crate::RuntimeError::new(
857                "durable_effects_missing_call_id",
858                "durable effects require a prepared tool call id",
859            )
860        })?;
861        let sequence = self.context.durable_effects.next_process_event_sequence();
862        let request = crate::ProcessEventAppendRequest::new(event_type, payload).with_replay_key(
863            format!("tool:{tool_call_id}:durable-process-event:{sequence}"),
864        );
865        self.context
866            .process_events()
867            .emit_request(request)
868            .await
869            .map_err(|err| {
870                crate::RuntimeError::new(
871                    "durable_effect_process_event_append_failed",
872                    err.to_string(),
873                )
874            })
875            .and_then(|event| {
876                if event.process_id == process.process_id {
877                    Ok(event)
878                } else {
879                    Err(crate::RuntimeError::new(
880                        "durable_effect_process_event_process_mismatch",
881                        "process event append returned an event for a different process",
882                    ))
883                }
884            })
885    }
886
887    fn step_invocation(
888        &self,
889        effect_id_suffix: impl Into<String>,
890        kind: crate::RuntimeEffectKind,
891        replay_suffix: impl AsRef<str>,
892    ) -> Result<crate::RuntimeInvocation, crate::RuntimeError> {
893        let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
894            crate::RuntimeError::new(
895                "durable_effects_missing_call_id",
896                "durable effects require a prepared tool call id",
897            )
898        })?;
899        let effect_id_suffix = effect_id_suffix.into();
900        if let Some(parent) = self.context.parent_invocation.as_ref() {
901            return Ok(crate::runtime::causal::child_effect_invocation(
902                parent,
903                format!("{tool_call_id}:{effect_id_suffix}"),
904                kind,
905                replay_suffix,
906            ));
907        }
908        let scoped = self.context.effect_controller.scoped();
909        let replay_key = format!(
910            "{}:tool:{tool_call_id}:{}",
911            scoped.scope_id(),
912            replay_suffix.as_ref()
913        );
914        Ok(crate::RuntimeInvocation::effect(
915            crate::RuntimeScope::new(self.context.session_id.clone()),
916            format!("{tool_call_id}:{effect_id_suffix}"),
917            kind,
918            replay_key,
919        ))
920    }
921}
922
923/// Runtime-prepared executable tool call.
924///
925/// The raw model/provider identity remains visible, but any argument rewrites
926/// and provider-owned context projections are frozen before the call crosses a
927/// runtime effect or process boundary.
928#[derive(Clone, Debug, Serialize, Deserialize)]
929pub struct PreparedToolCall {
930    pub call_id: String,
931    pub tool_id: ToolId,
932    pub tool_name: String,
933    pub args: serde_json::Value,
934    #[serde(default, skip_serializing_if = "Option::is_none")]
935    pub replay: Option<ProviderReplayMeta>,
936    #[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
937    pub prepared_payload: serde_json::Value,
938}
939
940impl PreparedToolCall {
941    pub fn identity(tool_id: ToolId, call: crate::sansio::PendingToolCall) -> Self {
942        Self {
943            call_id: call.call_id,
944            tool_id,
945            tool_name: call.tool_name,
946            args: call.args,
947            replay: call.replay,
948            prepared_payload: serde_json::Value::Null,
949        }
950    }
951
952    pub fn from_parts(
953        call_id: impl Into<String>,
954        tool_id: impl Into<ToolId>,
955        tool_name: impl Into<String>,
956        args: serde_json::Value,
957        replay: Option<ProviderReplayMeta>,
958        prepared_payload: serde_json::Value,
959    ) -> Self {
960        Self {
961            call_id: call_id.into(),
962            tool_id: tool_id.into(),
963            tool_name: tool_name.into(),
964            args,
965            replay,
966            prepared_payload,
967        }
968    }
969}
970
971/// One ordered child inside a runtime-prepared tool batch.
972///
973/// The call itself carries the executable provider payload. `replay_suffix`
974/// is the deterministic suffix used for child effects such as retry sleeps or
975/// pending completion awaits when the batch is the durable parent.
976#[derive(Clone, Debug, Serialize, Deserialize)]
977pub struct PreparedToolBatchCall {
978    pub call: PreparedToolCall,
979    pub replay_suffix: String,
980}
981
982/// Runtime-prepared executable tool batch.
983///
984/// The vector order is source order. Scheduling may run parallel-safe tools
985/// concurrently, but launches and pending completion consumption are projected
986/// back through this order.
987#[derive(Clone, Debug, Serialize, Deserialize)]
988pub struct PreparedToolBatch {
989    pub batch_id: String,
990    pub calls: Vec<PreparedToolBatchCall>,
991}
992
993impl PreparedToolBatch {
994    pub fn new(batch_id: impl Into<String>, calls: Vec<PreparedToolCall>) -> Self {
995        let batch_id = batch_id.into();
996        let calls = calls
997            .into_iter()
998            .enumerate()
999            .map(|(index, call)| PreparedToolBatchCall {
1000                replay_suffix: format!("child:{index}:{}", call.call_id),
1001                call,
1002            })
1003            .collect();
1004        Self { batch_id, calls }
1005    }
1006
1007    pub fn is_empty(&self) -> bool {
1008        self.calls.is_empty()
1009    }
1010
1011    pub fn len(&self) -> usize {
1012        self.calls.len()
1013    }
1014}
1015
1016#[derive(Clone)]
1017pub struct ToolPrepareContext {
1018    session_id: String,
1019    sessions: Arc<dyn SessionStateService>,
1020    turn_context: crate::TurnContext,
1021    tool_call_id: Option<String>,
1022}
1023
1024impl ToolPrepareContext {
1025    pub(crate) fn new(
1026        session_id: String,
1027        sessions: Arc<dyn SessionStateService>,
1028        turn_context: crate::TurnContext,
1029        tool_call_id: Option<String>,
1030    ) -> Self {
1031        Self {
1032            session_id,
1033            sessions,
1034            turn_context,
1035            tool_call_id,
1036        }
1037    }
1038
1039    pub fn session_id(&self) -> &str {
1040        &self.session_id
1041    }
1042
1043    pub fn tool_call_id(&self) -> Option<&str> {
1044        self.tool_call_id.as_deref()
1045    }
1046
1047    pub fn turn_context(&self) -> &crate::TurnContext {
1048        &self.turn_context
1049    }
1050
1051    pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
1052    where
1053        T: 'static,
1054    {
1055        self.turn_context.plugin_input::<T>(plugin_id)
1056    }
1057
1058    pub async fn session_snapshot(&self) -> Result<SessionSnapshot, PluginError> {
1059        self.sessions.snapshot_session(&self.session_id).await
1060    }
1061
1062    pub async fn tool_catalog(&self) -> Result<Vec<serde_json::Value>, PluginError> {
1063        self.sessions.tool_catalog(&self.session_id).await
1064    }
1065
1066    pub async fn shared_tool_catalog(
1067        &self,
1068    ) -> Result<std::sync::Arc<Vec<serde_json::Value>>, PluginError> {
1069        self.sessions.shared_tool_catalog(&self.session_id).await
1070    }
1071}
1072
1073/// Inputs handed to [`ToolProvider::prepare_tool_call`].
1074pub struct ToolPrepareCall<'a> {
1075    pub tool_id: ToolId,
1076    pub pending: crate::sansio::PendingToolCall,
1077    pub context: &'a ToolPrepareContext,
1078}
1079
1080/// Per-call inputs handed to [`ToolProvider::execute`].
1081///
1082/// Fields are `pub` because `ToolCall` is a transient borrow; consumers
1083/// typically destructure (`let ToolCall { name, args, .. } = call`). The
1084/// stable surface lives on [`ToolContext`] (sealed) and the runtime's
1085/// dispatcher, which constructs `ToolCall` values.
1086pub struct ToolCall<'a> {
1087    pub name: &'a str,
1088    pub args: &'a serde_json::Value,
1089    pub context: &'a ToolContext<'a>,
1090    pub progress: Option<&'a ProgressSender>,
1091}
1092
1093/// Trait for providing tools to the sandbox. Implement this per-project.
1094///
1095/// Implementations supply cheap [`ToolManifest`]s, lazily resolved
1096/// [`ToolContract`]s, and a single
1097/// [`execute`](Self::execute) method that handles every call. Tools that
1098/// need session state read it from `call.context`; tools that stream
1099/// progress send through `call.progress`.
1100#[async_trait::async_trait]
1101pub trait ToolProvider: Send + Sync + 'static {
1102    fn tool_manifests(&self) -> Vec<ToolManifest>;
1103    fn resolve_manifest(&self, name: &str) -> Option<ToolManifest> {
1104        self.tool_manifests()
1105            .into_iter()
1106            .find(|manifest| manifest.name == name)
1107    }
1108    fn resolve_manifest_by_id(&self, id: &ToolId) -> Option<ToolManifest> {
1109        self.tool_manifests()
1110            .into_iter()
1111            .find(|manifest| manifest.id == *id)
1112    }
1113    fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>>;
1114    fn resolve_contract_by_id(&self, id: &ToolId) -> Option<Arc<ToolContract>> {
1115        let manifest = self.resolve_manifest_by_id(id)?;
1116        self.resolve_contract(&manifest.name)
1117    }
1118    async fn prepare_tool_call(
1119        &self,
1120        call: ToolPrepareCall<'_>,
1121    ) -> Result<PreparedToolCall, ToolResult> {
1122        Ok(PreparedToolCall::identity(call.tool_id, call.pending))
1123    }
1124    async fn execute(&self, call: ToolCall<'_>) -> ToolResult;
1125    async fn execute_by_id(
1126        &self,
1127        tool_id: &ToolId,
1128        args: &serde_json::Value,
1129        context: &ToolContext<'_>,
1130        progress: Option<&ProgressSender>,
1131    ) -> ToolResult {
1132        let Some(manifest) = self.resolve_manifest_by_id(tool_id) else {
1133            return ToolResult::err_fmt(format!("Unknown tool id: {tool_id}"));
1134        };
1135        self.execute(ToolCall {
1136            name: &manifest.name,
1137            args,
1138            context,
1139            progress,
1140        })
1141        .await
1142    }
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147    use super::*;
1148    use crate::ProcessRegistry;
1149    use crate::RuntimeEffectController;
1150    use std::sync::atomic::{AtomicU64, Ordering};
1151
1152    struct NoDurableEffectController;
1153
1154    #[async_trait::async_trait]
1155    impl crate::RuntimeEffectController for NoDurableEffectController {
1156        async fn execute_effect(
1157            &self,
1158            _envelope: crate::RuntimeEffectEnvelope,
1159            _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
1160        ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
1161            Err(crate::RuntimeEffectControllerError::new(
1162                "unexpected_effect",
1163                "test controller should not execute effects",
1164            ))
1165        }
1166    }
1167
1168    fn test_context_with_controller(
1169        tool_call_id: Option<String>,
1170        controller: Arc<dyn crate::RuntimeEffectController>,
1171    ) -> ToolContext<'static> {
1172        ToolContext::builder(
1173            "session-1".to_string(),
1174            Arc::new(crate::testing::MockSessionManager::default()),
1175            Arc::new(crate::testing::MockSessionManager::default()),
1176            Arc::new(crate::testing::MockSessionManager::default()),
1177            Arc::new(crate::UnavailableProcessService),
1178            Arc::new(crate::DefaultProcessCancelAbility),
1179            crate::runtime::RuntimeEffectControllerHandle::shared(controller),
1180            Arc::new(crate::InMemoryAttachmentStore::new()),
1181            crate::DirectCompletionClient::unavailable(
1182                "direct completions are unavailable in this test context",
1183            ),
1184        )
1185        .tool_call_id(tool_call_id)
1186        .build()
1187    }
1188
1189    #[test]
1190    fn tool_context_builder_carries_call_payload_and_cancellation_state() {
1191        let cancellation = tokio_util::sync::CancellationToken::new();
1192        let prepared = PreparedToolCall::from_parts(
1193            "call-1",
1194            "tool:demo_tool",
1195            "demo_tool",
1196            serde_json::json!({ "input": true }),
1197            None,
1198            serde_json::json!({ "prepared": true }),
1199        );
1200
1201        let context = ToolContext::builder(
1202            "session-1".to_string(),
1203            Arc::new(crate::testing::MockSessionManager::default()),
1204            Arc::new(crate::testing::MockSessionManager::default()),
1205            Arc::new(crate::testing::MockSessionManager::default()),
1206            Arc::new(crate::UnavailableProcessService),
1207            Arc::new(crate::DefaultProcessCancelAbility),
1208            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1209                crate::InlineRuntimeEffectController,
1210            )),
1211            Arc::new(crate::InMemoryAttachmentStore::new()),
1212            crate::DirectCompletionClient::unavailable(
1213                "direct completions are unavailable in this test context",
1214            ),
1215        )
1216        .prepared_call(&prepared)
1217        .cancellation_token(Some(cancellation.clone()))
1218        .async_process("process-1", cancellation.clone())
1219        .build();
1220
1221        assert_eq!(context.session_id(), "session-1");
1222        assert_eq!(context.tool_call_id(), Some("call-1"));
1223        assert_eq!(
1224            context.prepared_payload(),
1225            &serde_json::json!({ "prepared": true })
1226        );
1227        assert_eq!(context.async_process_id(), Some("process-1"));
1228        assert!(context.cancellation_token().is_some());
1229    }
1230
1231    #[test]
1232    fn durable_effects_requires_prepared_call_id_and_supporting_controller() {
1233        let missing_call =
1234            test_context_with_controller(None, Arc::new(crate::InlineRuntimeEffectController));
1235        let err = match missing_call.durable_effects() {
1236            Ok(_) => panic!("missing prepared tool call id should fail"),
1237            Err(err) => err,
1238        };
1239        assert_eq!(err.code.as_str(), "durable_effects_missing_call_id");
1240
1241        let unsupported = test_context_with_controller(
1242            Some("call-1".to_string()),
1243            Arc::new(NoDurableEffectController),
1244        );
1245        let err = match unsupported.durable_effects() {
1246            Ok(_) => panic!("unsupported controller should fail"),
1247            Err(err) => err,
1248        };
1249        assert_eq!(err.code.as_str(), "durable_effects_unavailable");
1250    }
1251
1252    #[tokio::test]
1253    async fn durable_run_json_executes_and_maps_closure_errors() {
1254        let context = test_context_with_controller(
1255            Some("call-run-json".to_string()),
1256            Arc::new(crate::InlineRuntimeEffectController),
1257        );
1258        let durable = context.durable_effects().expect("durable effects");
1259        let value = durable
1260            .run_json(
1261                "create",
1262                serde_json::json!({ "x": 1 }),
1263                |input| async move { Ok(serde_json::json!({ "seen": input["x"] })) },
1264            )
1265            .await
1266            .expect("durable step");
1267        assert_eq!(value, serde_json::json!({ "seen": 1 }));
1268
1269        let err = durable
1270            .run_json("fail", serde_json::json!({}), |_| async {
1271                Err(crate::RuntimeError::new(
1272                    "durable_step_failed",
1273                    "step failed",
1274                ))
1275            })
1276            .await
1277            .expect_err("closure error");
1278        assert_eq!(err.code.as_str(), "durable_step_failed");
1279        assert_eq!(err.message, "step failed");
1280    }
1281
1282    #[tokio::test]
1283    async fn durable_run_json_rejects_empty_or_duplicate_step_ids_before_running() {
1284        let context = test_context_with_controller(
1285            Some("call-step-ids".to_string()),
1286            Arc::new(crate::InlineRuntimeEffectController),
1287        );
1288        let durable = context.durable_effects().expect("durable effects");
1289        let runs = Arc::new(AtomicU64::new(0));
1290
1291        let err = durable
1292            .run_json("", serde_json::Value::Null, {
1293                let runs = Arc::clone(&runs);
1294                move |_| async move {
1295                    runs.fetch_add(1, Ordering::Relaxed);
1296                    Ok(serde_json::Value::Null)
1297                }
1298            })
1299            .await
1300            .expect_err("empty step id");
1301        assert_eq!(err.code.as_str(), "durable_effect_empty_step_id");
1302        assert_eq!(runs.load(Ordering::Relaxed), 0);
1303
1304        durable
1305            .run_json("same", serde_json::Value::Null, {
1306                let runs = Arc::clone(&runs);
1307                move |_| async move {
1308                    runs.fetch_add(1, Ordering::Relaxed);
1309                    Ok(serde_json::Value::Null)
1310                }
1311            })
1312            .await
1313            .expect("first step");
1314        let err = durable
1315            .run_json("same", serde_json::Value::Null, {
1316                let runs = Arc::clone(&runs);
1317                move |_| async move {
1318                    runs.fetch_add(1, Ordering::Relaxed);
1319                    Ok(serde_json::Value::Null)
1320                }
1321            })
1322            .await
1323            .expect_err("duplicate step id");
1324        assert_eq!(err.code.as_str(), "durable_effect_duplicate_step_id");
1325        assert_eq!(runs.load(Ordering::Relaxed), 1);
1326    }
1327
1328    #[tokio::test]
1329    async fn durable_external_event_key_is_custom_and_stable() {
1330        let context = test_context_with_controller(
1331            Some("call-event-key".to_string()),
1332            Arc::new(crate::InlineRuntimeEffectController),
1333        );
1334        let durable = context.durable_effects().expect("durable effects");
1335        let first = durable
1336            .external_event_key("tool-event-stable")
1337            .await
1338            .expect("first key");
1339        let second = durable
1340            .external_event_key("tool-event-stable")
1341            .await
1342            .expect("second key");
1343
1344        assert_eq!(first, second);
1345        assert_eq!(
1346            first.wait,
1347            crate::AwaitEventWaitIdentity::Custom {
1348                key: "tool-event-stable".to_string()
1349            }
1350        );
1351    }
1352
1353    #[tokio::test]
1354    async fn durable_await_event_json_maps_terminal_resolutions() {
1355        let controller = Arc::new(crate::InlineRuntimeEffectController);
1356        let context =
1357            test_context_with_controller(Some("call-await-event".to_string()), controller.clone());
1358        let durable = context.durable_effects().expect("durable effects");
1359
1360        let ok_key = durable
1361            .external_event_key("tool-event-ok")
1362            .await
1363            .expect("ok key");
1364        controller
1365            .resolve_await_event(
1366                &ok_key,
1367                crate::Resolution::Ok(serde_json::json!({ "answer": 42 })),
1368            )
1369            .await
1370            .expect("resolve ok");
1371        let value = durable
1372            .await_event_json(ok_key)
1373            .await
1374            .expect("await ok value");
1375        assert_eq!(value, serde_json::json!({ "answer": 42 }));
1376
1377        let err_key = durable
1378            .external_event_key("tool-event-err")
1379            .await
1380            .expect("err key");
1381        controller
1382            .resolve_await_event(
1383                &err_key,
1384                crate::Resolution::Err(crate::ExternalCompletionError::new(
1385                    "external_bad",
1386                    "external failed",
1387                )),
1388            )
1389            .await
1390            .expect("resolve err");
1391        let err = durable
1392            .await_event_json(err_key)
1393            .await
1394            .expect_err("await err value");
1395        assert_eq!(err.code.as_str(), "external_bad");
1396
1397        let cancelled_key = durable
1398            .external_event_key("tool-event-cancelled")
1399            .await
1400            .expect("cancelled key");
1401        controller
1402            .resolve_await_event(&cancelled_key, crate::Resolution::Cancelled)
1403            .await
1404            .expect("resolve cancelled");
1405        let err = durable
1406            .await_event_json(cancelled_key)
1407            .await
1408            .expect_err("await cancelled value");
1409        assert_eq!(err.code.as_str(), "durable_effect_event_cancelled");
1410
1411        let timeout_key = durable
1412            .external_event_key("tool-event-timeout")
1413            .await
1414            .expect("timeout key");
1415        controller
1416            .resolve_await_event(&timeout_key, crate::Resolution::Timeout)
1417            .await
1418            .expect("resolve timeout");
1419        let err = durable
1420            .await_event_json(timeout_key)
1421            .await
1422            .expect_err("await timeout value");
1423        assert_eq!(err.code.as_str(), "durable_effect_event_timeout");
1424    }
1425
1426    #[tokio::test]
1427    async fn durable_emit_process_event_requires_process_and_appends_inside_process() {
1428        let context = test_context_with_controller(
1429            Some("call-no-process".to_string()),
1430            Arc::new(crate::InlineRuntimeEffectController),
1431        );
1432        let err = context
1433            .durable_effects()
1434            .expect("durable effects")
1435            .emit_process_event("tool.event", serde_json::json!({}))
1436            .await
1437            .expect_err("outside process");
1438        assert_eq!(
1439            err.code.as_str(),
1440            "durable_effect_process_event_unavailable"
1441        );
1442
1443        let registry = Arc::new(crate::TestLocalProcessRegistry::default());
1444        let process_id = "process:durable-tool-event";
1445        registry
1446            .register_process(
1447                crate::ProcessRegistration::new(
1448                    process_id,
1449                    crate::ProcessInput::External {
1450                        metadata: serde_json::json!({}),
1451                    },
1452                    crate::ProcessProvenance::host(),
1453                )
1454                .with_extra_event_types([crate::ProcessEventType {
1455                    name: "tool.event".to_string(),
1456                    payload_schema: crate::LashSchema::any(),
1457                    semantics: crate::ProcessEventSemanticsSpec::default(),
1458                }]),
1459            )
1460            .await
1461            .expect("register process");
1462        let registry_dyn: Arc<dyn crate::ProcessRegistry> = registry;
1463        let context = test_context_with_controller(
1464            Some("call-process-event".to_string()),
1465            Arc::new(crate::InlineRuntimeEffectController),
1466        )
1467        .with_process_events_for_testing(process_id, registry_dyn);
1468
1469        let event = context
1470            .durable_effects()
1471            .expect("durable effects")
1472            .emit_process_event("tool.event", serde_json::json!({ "ok": true }))
1473            .await
1474            .expect("process event");
1475        assert_eq!(event.process_id, process_id);
1476        assert_eq!(event.event_type, "tool.event");
1477        assert_eq!(event.payload, serde_json::json!({ "ok": true }));
1478        assert_eq!(
1479            event.invocation.replay_key(),
1480            Some("tool:call-process-event:durable-process-event:0")
1481        );
1482
1483        let append_err = context
1484            .durable_effects()
1485            .expect("durable effects")
1486            .emit_process_event("undeclared.event", serde_json::json!({}))
1487            .await
1488            .expect_err("undeclared event type must fail the append");
1489        assert_eq!(
1490            append_err.code.as_str(),
1491            "durable_effect_process_event_append_failed"
1492        );
1493    }
1494}