Skip to main content

lash_core/
tool_provider.rs

1use std::collections::HashSet;
2use std::future::Future;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use lash_sansio::llm::types::ProviderReplayMeta;
7use serde::{Deserialize, Serialize};
8
9use crate::plugin::{
10    PluginError, SessionGraphService, SessionLifecycleService, SessionSnapshot, SessionStateService,
11};
12use crate::{AttachmentStore, ToolContract, ToolDefinition, ToolId, ToolManifest, ToolResult};
13
14mod attachments;
15mod direct_completion;
16mod dispatch;
17mod process;
18pub(crate) mod process_events;
19mod session;
20mod triggers;
21
22pub use attachments::ToolAttachmentClient;
23pub use direct_completion::ToolDirectCompletionClient;
24pub use dispatch::ToolDispatchClient;
25pub use process::ToolSessionProcessAdmin;
26pub use process_events::ToolProcessEventClient;
27pub use session::{ToolSessionAdmin, ToolSessionModel};
28pub use triggers::ToolTriggerClient;
29
30/// A message sent from the sandbox to the host during execution.
31#[derive(Clone, Debug)]
32pub struct SandboxMessage {
33    pub text: String,
34    /// "tool_output" or another host-rendered progress event kind.
35    pub kind: String,
36}
37
38/// Sender for streaming progress messages from tools (e.g. live bash output).
39pub type ProgressSender = tokio::sync::mpsc::UnboundedSender<SandboxMessage>;
40
41#[derive(Clone, Default)]
42pub(crate) struct ToolCompletionState {
43    key: Arc<Mutex<Option<crate::AwaitEventKey>>>,
44}
45
46impl ToolCompletionState {
47    fn store(
48        &self,
49        key: crate::AwaitEventKey,
50    ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
51        let mut guard = self.key.lock().map_err(|_| {
52            crate::RuntimeError::new(
53                "tool_completion_state_poisoned",
54                "tool completion key state lock poisoned",
55            )
56        })?;
57        if let Some(existing) = guard.as_ref() {
58            return Ok(existing.clone());
59        }
60        *guard = Some(key.clone());
61        Ok(key)
62    }
63
64    pub(crate) fn take(&self) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
65        self.key.lock().map(|mut guard| guard.take()).map_err(|_| {
66            crate::RuntimeError::new(
67                "tool_completion_state_poisoned",
68                "tool completion key state lock poisoned",
69            )
70        })
71    }
72}
73
74#[derive(Clone, Default)]
75pub(crate) struct ToolDurableEffectState {
76    step_ids: Arc<Mutex<HashSet<String>>>,
77    process_event_sequence: Arc<AtomicU64>,
78}
79
80impl ToolDurableEffectState {
81    fn reserve_step(&self, step_id: &str) -> Result<(), crate::RuntimeError> {
82        let mut guard = self.step_ids.lock().map_err(|_| {
83            crate::RuntimeError::new(
84                "durable_effect_state_poisoned",
85                "durable effect step state lock poisoned",
86            )
87        })?;
88        if !guard.insert(step_id.to_string()) {
89            return Err(crate::RuntimeError::new(
90                "durable_effect_duplicate_step_id",
91                format!("durable effect step id `{step_id}` was already used by this tool call"),
92            ));
93        }
94        Ok(())
95    }
96
97    fn next_process_event_sequence(&self) -> u64 {
98        self.process_event_sequence.fetch_add(1, Ordering::Relaxed)
99    }
100}
101
102/// Per-call environment for [`ToolProvider::execute`]. Fields are sealed so
103/// the runtime can add capabilities without breaking tool authors.
104#[derive(Clone)]
105pub struct ToolContext<'run> {
106    pub(crate) session_id: String,
107    pub(crate) agent_frame_id: crate::AgentFrameId,
108    pub(crate) sessions: Arc<dyn SessionStateService>,
109    pub(crate) session_lifecycle: Arc<dyn SessionLifecycleService>,
110    pub(crate) processes: Arc<dyn crate::ProcessService>,
111    pub(crate) process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
112    pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
113    pub(crate) runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
114    pub(crate) runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
115    pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
116    pub(crate) async_process_id: Option<String>,
117    pub(crate) runtime_process_id: Option<String>,
118    pub(crate) process_events: Option<ToolProcessEventContext>,
119    pub(crate) attachment_store: Arc<dyn AttachmentStore>,
120    pub(crate) direct_completions: crate::DirectCompletionClient<'run>,
121    pub(crate) prepared_payload: serde_json::Value,
122    pub(crate) tool_execution_binding: serde_json::Value,
123    /// The id of the in-flight tool call that is invoking this tool.
124    pub(crate) tool_call_id: Option<String>,
125    pub(crate) attempt_number: u32,
126    pub(crate) max_attempts: u32,
127    pub(crate) replay_key: Option<String>,
128    pub(crate) completion: ToolCompletionState,
129    pub(crate) durable_effects: ToolDurableEffectState,
130    pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
131    pub(crate) execution_env_spec: crate::ProcessExecutionEnvSpec,
132    pub(crate) child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
133}
134
135#[derive(Clone)]
136pub struct ToolChildProcessStarted {
137    pub process_id: String,
138    pub child_entry_name: Option<String>,
139}
140
141#[derive(Clone)]
142pub struct ToolChildExecutionTraceHook {
143    on_child_process_started: Arc<dyn Fn(ToolChildProcessStarted) + Send + Sync>,
144}
145
146impl ToolChildExecutionTraceHook {
147    pub fn new(
148        on_child_process_started: impl Fn(ToolChildProcessStarted) + Send + Sync + 'static,
149    ) -> Self {
150        Self {
151            on_child_process_started: Arc::new(on_child_process_started),
152        }
153    }
154
155    pub fn child_process_started(&self, event: ToolChildProcessStarted) {
156        (self.on_child_process_started)(event);
157    }
158}
159
160#[derive(Clone)]
161pub(crate) struct ToolProcessEventContext {
162    process_id: String,
163    registry: Arc<dyn crate::ProcessRegistry>,
164    awaiter: crate::ProcessAwaiter,
165    store: Option<Arc<dyn crate::RuntimePersistence>>,
166    session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
167    session_graph: Arc<dyn SessionGraphService>,
168    queued_work_driver: Option<crate::QueuedWorkDriver>,
169}
170
171pub(crate) struct ToolContextBuilder<'run> {
172    session_id: String,
173    agent_frame_id: crate::AgentFrameId,
174    sessions: Arc<dyn SessionStateService>,
175    session_lifecycle: Arc<dyn SessionLifecycleService>,
176    session_graph: Arc<dyn SessionGraphService>,
177    processes: Arc<dyn crate::ProcessService>,
178    process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
179    effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
180    runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
181    runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
182    cancellation_token: Option<tokio_util::sync::CancellationToken>,
183    async_process_id: Option<String>,
184    runtime_process_id: Option<String>,
185    process_events: Option<ToolProcessEventContext>,
186    attachment_store: Arc<dyn AttachmentStore>,
187    direct_completions: crate::DirectCompletionClient<'run>,
188    prepared_payload: serde_json::Value,
189    tool_execution_binding: serde_json::Value,
190    tool_call_id: Option<String>,
191    completion: ToolCompletionState,
192    durable_effects: ToolDurableEffectState,
193    parent_invocation: Option<crate::RuntimeInvocation>,
194    execution_env_spec: crate::ProcessExecutionEnvSpec,
195    child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
196}
197
198impl<'run> ToolContextBuilder<'run> {
199    pub(crate) fn from_dispatch(
200        dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
201    ) -> Self {
202        Self {
203            session_id: dispatch.session_id.clone(),
204            agent_frame_id: dispatch.agent_frame_id.clone(),
205            sessions: Arc::clone(&dispatch.sessions),
206            session_lifecycle: Arc::clone(&dispatch.session_lifecycle),
207            session_graph: Arc::clone(&dispatch.session_graph),
208            processes: Arc::clone(&dispatch.processes),
209            process_cancel_ability: Arc::clone(&dispatch.process_cancel_ability),
210            effect_controller: dispatch.effect_controller.clone(),
211            runtime_dispatch: Some(Arc::clone(&dispatch)),
212            runtime_execution_context: None,
213            cancellation_token: None,
214            async_process_id: None,
215            runtime_process_id: None,
216            process_events: None,
217            attachment_store: Arc::clone(&dispatch.attachment_store),
218            direct_completions: dispatch.direct_completions.clone(),
219            prepared_payload: serde_json::Value::Null,
220            tool_execution_binding: serde_json::Value::Null,
221            tool_call_id: None,
222            completion: ToolCompletionState::default(),
223            durable_effects: ToolDurableEffectState::default(),
224            parent_invocation: dispatch.parent_invocation.clone(),
225            execution_env_spec: dispatch.execution_env_spec.clone(),
226            child_execution_trace_hook: None,
227        }
228    }
229
230    #[cfg(any(test, feature = "testing"))]
231    pub(crate) fn tool_call_id(mut self, tool_call_id: impl Into<Option<String>>) -> Self {
232        self.tool_call_id = tool_call_id.into();
233        self
234    }
235
236    pub(crate) fn prepared_call(mut self, call: &PreparedToolCall) -> Self {
237        self.tool_call_id = Some(call.call_id.clone());
238        self.prepared_payload = call.prepared_payload.clone();
239        self
240    }
241
242    pub(crate) fn tool_execution_binding(mut self, binding: serde_json::Value) -> Self {
243        self.tool_execution_binding = binding;
244        self
245    }
246
247    pub(crate) fn cancellation_token(
248        mut self,
249        cancellation_token: Option<tokio_util::sync::CancellationToken>,
250    ) -> Self {
251        self.cancellation_token = cancellation_token;
252        self
253    }
254
255    pub(crate) fn runtime_execution_context(
256        mut self,
257        context: crate::RuntimeExecutionContext<'run>,
258    ) -> Self {
259        self.runtime_execution_context = Some(context);
260        self
261    }
262
263    pub(crate) fn runtime_process_id(mut self, process_id: Option<String>) -> Self {
264        self.runtime_process_id = process_id;
265        self
266    }
267
268    pub(crate) fn async_process(
269        mut self,
270        process_id: impl Into<String>,
271        cancellation_token: tokio_util::sync::CancellationToken,
272    ) -> Self {
273        self.async_process_id = Some(process_id.into());
274        self.cancellation_token = Some(cancellation_token);
275        self
276    }
277
278    pub(crate) fn process_events(
279        mut self,
280        process_id: impl Into<String>,
281        registry: Arc<dyn crate::ProcessRegistry>,
282        awaiter: crate::ProcessAwaiter,
283        store: Option<Arc<dyn crate::RuntimePersistence>>,
284        session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
285        queued_work_driver: Option<crate::QueuedWorkDriver>,
286    ) -> Self {
287        self.process_events = Some(ToolProcessEventContext {
288            process_id: process_id.into(),
289            registry,
290            awaiter,
291            store,
292            session_store_factory,
293            session_graph: Arc::clone(&self.session_graph),
294            queued_work_driver,
295        });
296        self
297    }
298
299    pub(crate) fn parent_invocation(mut self, metadata: Option<crate::RuntimeInvocation>) -> Self {
300        self.parent_invocation = metadata;
301        self
302    }
303
304    pub(crate) fn child_execution_trace_hook(
305        mut self,
306        hook: Option<ToolChildExecutionTraceHook>,
307    ) -> Self {
308        self.child_execution_trace_hook = hook;
309        self
310    }
311
312    pub(crate) fn build(self) -> ToolContext<'run> {
313        ToolContext {
314            session_id: self.session_id,
315            agent_frame_id: self.agent_frame_id,
316            sessions: self.sessions,
317            session_lifecycle: self.session_lifecycle,
318            processes: self.processes,
319            process_cancel_ability: self.process_cancel_ability,
320            effect_controller: self.effect_controller,
321            runtime_dispatch: self.runtime_dispatch,
322            runtime_execution_context: self.runtime_execution_context,
323            cancellation_token: self.cancellation_token,
324            async_process_id: self.async_process_id,
325            runtime_process_id: self.runtime_process_id,
326            process_events: self.process_events,
327            attachment_store: self.attachment_store,
328            direct_completions: self.direct_completions,
329            prepared_payload: self.prepared_payload,
330            tool_execution_binding: self.tool_execution_binding,
331            tool_call_id: self.tool_call_id,
332            attempt_number: 1,
333            max_attempts: 1,
334            replay_key: None,
335            completion: self.completion,
336            durable_effects: self.durable_effects,
337            parent_invocation: self.parent_invocation,
338            execution_env_spec: self.execution_env_spec,
339            child_execution_trace_hook: self.child_execution_trace_hook,
340        }
341    }
342}
343
344impl<'run> ToolContext<'run> {
345    #[cfg(any(test, feature = "testing"))]
346    #[expect(
347        clippy::too_many_arguments,
348        reason = "testing constructor mirrors the sealed runtime tool context dependencies"
349    )]
350    pub(crate) fn builder(
351        session_id: String,
352        sessions: Arc<dyn SessionStateService>,
353        session_lifecycle: Arc<dyn SessionLifecycleService>,
354        session_graph: Arc<dyn SessionGraphService>,
355        processes: Arc<dyn crate::ProcessService>,
356        process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
357        effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
358        attachment_store: Arc<dyn AttachmentStore>,
359        direct_completions: crate::DirectCompletionClient<'run>,
360    ) -> ToolContextBuilder<'run> {
361        ToolContextBuilder {
362            session_id,
363            agent_frame_id: String::new(),
364            sessions,
365            session_lifecycle,
366            session_graph,
367            processes,
368            process_cancel_ability,
369            effect_controller,
370            runtime_dispatch: None,
371            runtime_execution_context: None,
372            cancellation_token: None,
373            async_process_id: None,
374            runtime_process_id: None,
375            process_events: None,
376            attachment_store,
377            direct_completions,
378            prepared_payload: serde_json::Value::Null,
379            tool_execution_binding: serde_json::Value::Null,
380            tool_call_id: None,
381            completion: ToolCompletionState::default(),
382            durable_effects: ToolDurableEffectState::default(),
383            parent_invocation: None,
384            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
385                crate::PluginOptions::default(),
386                crate::SessionPolicy::default(),
387            ),
388            child_execution_trace_hook: None,
389        }
390    }
391
392    pub(crate) fn from_dispatch(
393        dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
394    ) -> ToolContextBuilder<'run> {
395        ToolContextBuilder::from_dispatch(dispatch)
396    }
397
398    pub fn session_id(&self) -> &str {
399        &self.session_id
400    }
401
402    pub fn agent_frame_id(&self) -> &str {
403        &self.agent_frame_id
404    }
405
406    pub fn sessions(&self) -> ToolSessionAdmin<'run> {
407        ToolSessionAdmin {
408            session_id: self.session_id.clone(),
409            sessions: Arc::clone(&self.sessions),
410            session_lifecycle: Arc::clone(&self.session_lifecycle),
411            effect_controller: self.effect_controller.clone(),
412        }
413    }
414
415    pub fn dispatch(&self) -> ToolDispatchClient<'run> {
416        ToolDispatchClient {
417            context: self.clone(),
418        }
419    }
420
421    pub fn triggers(&self) -> ToolTriggerClient<'run> {
422        ToolTriggerClient {
423            context: self.clone(),
424        }
425    }
426
427    pub fn processes(&self) -> ToolSessionProcessAdmin<'run> {
428        ToolSessionProcessAdmin {
429            session_id: self.session_id.clone(),
430            agent_frame_id: self.agent_frame_id.clone(),
431            processes: Arc::clone(&self.processes),
432            process_cancel_ability: Arc::clone(&self.process_cancel_ability),
433            effect_controller: self.effect_controller.clone(),
434            parent_invocation: self.parent_invocation.clone(),
435            tool_call_id: self.tool_call_id.clone(),
436            execution_env_spec: self.execution_env_spec.clone(),
437        }
438    }
439
440    pub fn emit_child_process_started(
441        &self,
442        process_id: impl Into<String>,
443        child_entry_name: Option<String>,
444    ) {
445        let Some(hook) = &self.child_execution_trace_hook else {
446            return;
447        };
448        hook.child_process_started(ToolChildProcessStarted {
449            process_id: process_id.into(),
450            child_entry_name,
451        });
452    }
453
454    pub fn direct_completions(&self) -> ToolDirectCompletionClient<'run> {
455        ToolDirectCompletionClient {
456            session_id: self.session_id.clone(),
457            tool_call_id: self.tool_call_id.clone(),
458            direct_completions: self.direct_completions.clone(),
459            parent_invocation: self.parent_invocation.clone(),
460            parent_tool_attempt_is_durable: self.parent_invocation.as_ref().is_some_and(
461                |invocation| {
462                    invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolAttempt)
463                },
464            ) && self
465                .effect_controller
466                .controller()
467                .durability_tier()
468                == crate::DurabilityTier::Durable,
469        }
470    }
471
472    pub fn attachments(&self) -> ToolAttachmentClient {
473        ToolAttachmentClient {
474            store: Arc::clone(&self.attachment_store),
475        }
476    }
477
478    pub fn process_events(&self) -> ToolProcessEventClient {
479        ToolProcessEventClient {
480            context: self.process_events.clone(),
481        }
482    }
483
484    /// Borrow this tool call's durable effect boundary.
485    ///
486    /// This is available only while executing a prepared tool call under a
487    /// controller that explicitly supports durable tool effects. The returned
488    /// facade records JSON steps and await-event waits in the caller's existing
489    /// effect log; it does not expose the underlying workflow engine context.
490    pub fn durable_effects(&self) -> Result<ToolDurableEffects<'_, 'run>, crate::RuntimeError> {
491        let Some(tool_call_id) = self.tool_call_id.as_deref() else {
492            return Err(crate::RuntimeError::new(
493                "durable_effects_missing_call_id",
494                "durable effects require a prepared tool call id",
495            ));
496        };
497        if tool_call_id.trim().is_empty() {
498            return Err(crate::RuntimeError::new(
499                "durable_effects_missing_call_id",
500                "durable effects require a non-empty prepared tool call id",
501            ));
502        }
503        let scoped = self.effect_controller.scoped();
504        if !scoped.controller().supports_durable_effects() {
505            return Err(crate::RuntimeError::new(
506                "durable_effects_unavailable",
507                "this effect controller does not support durable tool effects",
508            ));
509        }
510        if self.parent_invocation.as_ref().is_some_and(|invocation| {
511            invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolAttempt)
512        }) && scoped.controller().durability_tier() == crate::DurabilityTier::Durable
513        {
514            return Err(crate::RuntimeError::new(
515                "durable_effects_unavailable_in_tool_attempt",
516                "durable tool sub-effects are not available inside a journaled tool attempt",
517            ));
518        }
519        Ok(ToolDurableEffects { context: self })
520    }
521
522    pub fn cancellation_token(&self) -> Option<&tokio_util::sync::CancellationToken> {
523        self.cancellation_token.as_ref()
524    }
525
526    pub fn async_process_id(&self) -> Option<&str> {
527        self.async_process_id.as_deref()
528    }
529
530    pub fn runtime_process_id(&self) -> Option<&str> {
531        self.async_process_id
532            .as_deref()
533            .or(self.runtime_process_id.as_deref())
534            .or_else(|| {
535                self.process_events
536                    .as_ref()
537                    .map(|context| context.process_id.as_str())
538            })
539    }
540
541    pub fn tool_call_id(&self) -> Option<&str> {
542        self.tool_call_id.as_deref()
543    }
544
545    pub fn prepared_payload(&self) -> &serde_json::Value {
546        &self.prepared_payload
547    }
548
549    pub fn tool_execution_binding(&self) -> &serde_json::Value {
550        &self.tool_execution_binding
551    }
552
553    pub fn decode_prepared_payload<T>(&self) -> Result<T, serde_json::Error>
554    where
555        T: serde::de::DeserializeOwned,
556    {
557        serde_json::from_value(self.prepared_payload.clone())
558    }
559
560    pub fn attempt_number(&self) -> u32 {
561        self.attempt_number
562    }
563
564    pub fn max_attempts(&self) -> u32 {
565        self.max_attempts
566    }
567
568    pub fn replay_key(&self) -> Option<&str> {
569        self.replay_key.as_deref()
570    }
571
572    /// Obtain the durable completion key for this call, required before returning
573    /// [`ToolResult::Pending`](crate::ToolResult::Pending).
574    ///
575    /// A tool that defers its outcome (waiting on a webhook, human approval, or another
576    /// service) calls this, hands the returned [`AwaitEventKey`](crate::AwaitEventKey)
577    /// to whatever will complete the work out-of-band, and then returns
578    /// `ToolResult::Pending(..)`. The key names the durable wait the runtime parks the
579    /// call on; the external resolver delivers the result against it later.
580    ///
581    /// The key is stored on the context and consumed by the dispatcher when the tool
582    /// returns `Pending`. Returning `Pending` without first calling this fails the call
583    /// with `pending_tool_missing_completion_key`. Calls made outside a prepared tool
584    /// invocation (no tool call id) fail with `tool_completion_key_missing_call_id`.
585    pub async fn completion_key(&self) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
586        let tool_call_id = self.tool_call_id.clone().ok_or_else(|| {
587            crate::RuntimeError::new(
588                "tool_completion_key_missing_call_id",
589                "completion keys require a prepared tool call id",
590            )
591        })?;
592        let scoped = self.effect_controller.scoped();
593        let key = scoped
594            .controller()
595            .await_event_key(
596                scoped.execution_scope(),
597                crate::AwaitEventWaitIdentity::tool_completion(tool_call_id),
598            )
599            .await?;
600        self.completion.store(key)
601    }
602
603    pub(crate) fn take_completion_key(
604        &self,
605    ) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
606        self.completion.take()
607    }
608
609    pub fn with_async_process(
610        mut self,
611        process_id: impl Into<String>,
612        cancellation_token: tokio_util::sync::CancellationToken,
613    ) -> Self {
614        self.async_process_id = Some(process_id.into());
615        self.runtime_process_id = self.async_process_id.clone();
616        self.cancellation_token = Some(cancellation_token);
617        self
618    }
619
620    #[cfg(any(test, feature = "testing"))]
621    #[doc(hidden)]
622    pub fn with_process_events_for_testing(
623        mut self,
624        process_id: impl Into<String>,
625        registry: Arc<dyn crate::ProcessRegistry>,
626    ) -> Self {
627        let awaiter = crate::ProcessAwaiter::polling(Arc::clone(&registry));
628        self.process_events = Some(ToolProcessEventContext {
629            process_id: process_id.into(),
630            registry,
631            awaiter,
632            store: None,
633            session_store_factory: None,
634            session_graph: Arc::new(crate::plugin::NoopSessionManager),
635            queued_work_driver: None,
636        });
637        self
638    }
639
640    pub(crate) fn with_retry_context(
641        mut self,
642        tool_name: &str,
643        attempt_number: u32,
644        max_attempts: u32,
645    ) -> Self {
646        self.attempt_number = attempt_number.max(1);
647        self.max_attempts = max_attempts.max(1);
648        self.replay_key = self
649            .tool_call_id
650            .as_ref()
651            .map(|call_id| format!("lash-tool:{}:{call_id}:{tool_name}", self.session_id));
652        self
653    }
654
655    pub(crate) fn with_prepared_payload(mut self, payload: serde_json::Value) -> Self {
656        self.prepared_payload = payload;
657        self
658    }
659
660    pub(crate) fn with_tool_execution_binding(mut self, binding: serde_json::Value) -> Self {
661        self.tool_execution_binding = binding;
662        self
663    }
664
665    /// Constructor reserved for `lash_core::testing` helpers. Do not call directly;
666    /// use [`lash_core::testing::mock_tool_context`] instead.
667    #[cfg(any(test, feature = "testing"))]
668    #[doc(hidden)]
669    #[expect(
670        clippy::too_many_arguments,
671        reason = "test-only constructor mirrors the sealed runtime tool context"
672    )]
673    pub fn __for_testing(
674        session_id: String,
675        sessions: Arc<dyn SessionStateService>,
676        session_lifecycle: Arc<dyn SessionLifecycleService>,
677        session_graph: Arc<dyn SessionGraphService>,
678        processes: Arc<dyn crate::ProcessService>,
679        attachment_store: Arc<dyn AttachmentStore>,
680        direct_completions: crate::DirectCompletionClient<'static>,
681        tool_call_id: Option<String>,
682    ) -> ToolContext<'static> {
683        ToolContext::builder(
684            session_id,
685            sessions,
686            session_lifecycle,
687            session_graph,
688            processes,
689            Arc::new(crate::DefaultProcessCancelAbility),
690            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
691                crate::InlineRuntimeEffectController,
692            )),
693            attachment_store,
694            direct_completions,
695        )
696        .tool_call_id(tool_call_id)
697        .build()
698    }
699
700    /// Constructor reserved for tests that need a custom process-cancel host
701    /// ability. Do not call directly; prefer public testing helpers when they
702    /// cover the case.
703    #[cfg(any(test, feature = "testing"))]
704    #[doc(hidden)]
705    #[expect(
706        clippy::too_many_arguments,
707        reason = "test-only constructor mirrors the sealed runtime context"
708    )]
709    pub fn __for_testing_with_process_cancel_ability(
710        session_id: String,
711        sessions: Arc<dyn SessionStateService>,
712        session_lifecycle: Arc<dyn SessionLifecycleService>,
713        session_graph: Arc<dyn SessionGraphService>,
714        processes: Arc<dyn crate::ProcessService>,
715        process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
716        attachment_store: Arc<dyn AttachmentStore>,
717        direct_completions: crate::DirectCompletionClient<'static>,
718        tool_call_id: Option<String>,
719    ) -> ToolContext<'static> {
720        ToolContext::builder(
721            session_id,
722            sessions,
723            session_lifecycle,
724            session_graph,
725            processes,
726            process_cancel_ability,
727            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
728                crate::InlineRuntimeEffectController,
729            )),
730            attachment_store,
731            direct_completions,
732        )
733        .tool_call_id(tool_call_id)
734        .build()
735    }
736}
737
738/// Durable effect operations available to advanced in-process tools.
739///
740/// The facade borrows the caller's existing runtime effect boundary. It records
741/// JSON-only local steps and awaits host-signed event keys without exposing
742/// Restate, Temporal, or any other workflow-native context.
743pub struct ToolDurableEffects<'ctx, 'run> {
744    context: &'ctx ToolContext<'run>,
745}
746
747impl<'ctx, 'run> ToolDurableEffects<'ctx, 'run> {
748    pub async fn run_json<F, Fut>(
749        &self,
750        step_id: impl Into<String>,
751        input: serde_json::Value,
752        run: F,
753    ) -> Result<serde_json::Value, crate::RuntimeError>
754    where
755        F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
756        Fut: Future<Output = Result<serde_json::Value, crate::RuntimeError>> + Send + 'run,
757    {
758        let step_id = step_id.into();
759        if step_id.trim().is_empty() {
760            return Err(crate::RuntimeError::new(
761                "durable_effect_empty_step_id",
762                "durable effect step id must be non-empty",
763            ));
764        }
765        self.context.durable_effects.reserve_step(&step_id)?;
766        let invocation = self.step_invocation(
767            format!("durable-step:{step_id}"),
768            crate::RuntimeEffectKind::DurableStep,
769            format!("durable-step:{step_id}"),
770        )?;
771        let outcome = self
772            .context
773            .effect_controller
774            .controller()
775            .execute_effect(
776                crate::RuntimeEffectEnvelope::new(
777                    invocation,
778                    crate::RuntimeEffectCommand::DurableStep {
779                        step_id,
780                        input: input.clone(),
781                    },
782                ),
783                crate::RuntimeEffectLocalExecutor::durable_step(run),
784            )
785            .await
786            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
787        outcome
788            .into_durable_step()
789            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)
790    }
791
792    pub async fn external_event_key(
793        &self,
794        key: impl Into<String>,
795    ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
796        let key = key.into();
797        if key.trim().is_empty() {
798            return Err(crate::RuntimeError::new(
799                "durable_effect_empty_event_key",
800                "durable effect external event key must be non-empty",
801            ));
802        }
803        let scoped = self.context.effect_controller.scoped();
804        scoped
805            .controller()
806            .await_event_key(
807                scoped.execution_scope(),
808                crate::AwaitEventWaitIdentity::Custom { key },
809            )
810            .await
811    }
812
813    pub async fn await_event_json(
814        &self,
815        key: crate::AwaitEventKey,
816    ) -> Result<serde_json::Value, crate::RuntimeError> {
817        let invocation = self.step_invocation(
818            format!("await-event:{}", key.key_id),
819            crate::RuntimeEffectKind::AwaitEvent,
820            format!("await-event:{}", key.key_id),
821        )?;
822        let cancellation = self.context.cancellation_token.clone().unwrap_or_default();
823        let clock = self
824            .context
825            .runtime_dispatch
826            .as_ref()
827            .map(|dispatch| Arc::clone(&dispatch.clock))
828            .unwrap_or_else(|| Arc::new(crate::SystemClock));
829        let outcome = self
830            .context
831            .effect_controller
832            .controller()
833            .execute_effect(
834                crate::RuntimeEffectEnvelope::new(
835                    invocation,
836                    crate::RuntimeEffectCommand::AwaitEvent { key },
837                ),
838                crate::RuntimeEffectLocalExecutor::await_event_with_clock(
839                    cancellation,
840                    None,
841                    clock,
842                ),
843            )
844            .await
845            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
846        match outcome
847            .into_await_event()
848            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?
849        {
850            crate::Resolution::Ok(value) => Ok(value),
851            crate::Resolution::Err(err) => Err(crate::RuntimeError::new(err.code, err.message)),
852            crate::Resolution::Timeout => Err(crate::RuntimeError::new(
853                "durable_effect_event_timeout",
854                "durable effect external event wait timed out",
855            )),
856            crate::Resolution::Cancelled => Err(crate::RuntimeError::new(
857                "durable_effect_event_cancelled",
858                "durable effect external event wait was cancelled",
859            )),
860        }
861    }
862
863    pub async fn emit_process_event(
864        &self,
865        event_type: impl Into<String>,
866        payload: serde_json::Value,
867    ) -> Result<crate::ProcessEvent, crate::RuntimeError> {
868        let Some(process) = self.context.process_events.as_ref() else {
869            return Err(crate::RuntimeError::new(
870                "durable_effect_process_event_unavailable",
871                "durable effect process events are unavailable outside a durable process",
872            ));
873        };
874        let event_type = event_type.into();
875        if event_type.trim().is_empty() {
876            return Err(crate::RuntimeError::new(
877                "durable_effect_empty_process_event_type",
878                "durable effect process event type must be non-empty",
879            ));
880        }
881        let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
882            crate::RuntimeError::new(
883                "durable_effects_missing_call_id",
884                "durable effects require a prepared tool call id",
885            )
886        })?;
887        let sequence = self.context.durable_effects.next_process_event_sequence();
888        let request = crate::ProcessEventAppendRequest::new(event_type, payload).with_replay_key(
889            format!("tool:{tool_call_id}:durable-process-event:{sequence}"),
890        );
891        self.context
892            .process_events()
893            .emit_request(request)
894            .await
895            .map_err(|err| {
896                crate::RuntimeError::new(
897                    "durable_effect_process_event_append_failed",
898                    err.to_string(),
899                )
900            })
901            .and_then(|event| {
902                if event.process_id == process.process_id {
903                    Ok(event)
904                } else {
905                    Err(crate::RuntimeError::new(
906                        "durable_effect_process_event_process_mismatch",
907                        "process event append returned an event for a different process",
908                    ))
909                }
910            })
911    }
912
913    fn step_invocation(
914        &self,
915        effect_id_suffix: impl Into<String>,
916        kind: crate::RuntimeEffectKind,
917        replay_suffix: impl AsRef<str>,
918    ) -> Result<crate::RuntimeInvocation, crate::RuntimeError> {
919        let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
920            crate::RuntimeError::new(
921                "durable_effects_missing_call_id",
922                "durable effects require a prepared tool call id",
923            )
924        })?;
925        let effect_id_suffix = effect_id_suffix.into();
926        if let Some(parent) = self.context.parent_invocation.as_ref() {
927            return Ok(crate::runtime::causal::child_effect_invocation(
928                parent,
929                format!("{tool_call_id}:{effect_id_suffix}"),
930                kind,
931                replay_suffix,
932            ));
933        }
934        let scoped = self.context.effect_controller.scoped();
935        let replay_key = format!(
936            "{}:tool:{tool_call_id}:{}",
937            scoped.scope_id(),
938            replay_suffix.as_ref()
939        );
940        Ok(crate::RuntimeInvocation::effect(
941            crate::RuntimeScope::new(self.context.session_id.clone()),
942            format!("{tool_call_id}:{effect_id_suffix}"),
943            kind,
944            replay_key,
945        ))
946    }
947}
948
949/// Runtime-prepared executable tool call.
950///
951/// The raw model/provider identity remains visible, but any argument rewrites
952/// and provider-owned context projections are frozen before the call crosses a
953/// runtime effect or process boundary.
954#[derive(Clone, Debug, Serialize, Deserialize)]
955pub struct PreparedToolCall {
956    pub call_id: String,
957    pub tool_id: ToolId,
958    pub tool_name: String,
959    pub args: serde_json::Value,
960    #[serde(default, skip_serializing_if = "Option::is_none")]
961    pub replay: Option<ProviderReplayMeta>,
962    #[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
963    pub prepared_payload: serde_json::Value,
964}
965
966impl PreparedToolCall {
967    pub fn identity(tool_id: ToolId, call: crate::sansio::PendingToolCall) -> Self {
968        Self {
969            call_id: call.call_id,
970            tool_id,
971            tool_name: call.tool_name,
972            args: call.args,
973            replay: call.replay,
974            prepared_payload: serde_json::Value::Null,
975        }
976    }
977
978    pub fn from_parts(
979        call_id: impl Into<String>,
980        tool_id: impl Into<ToolId>,
981        tool_name: impl Into<String>,
982        args: serde_json::Value,
983        replay: Option<ProviderReplayMeta>,
984        prepared_payload: serde_json::Value,
985    ) -> Self {
986        Self {
987            call_id: call_id.into(),
988            tool_id: tool_id.into(),
989            tool_name: tool_name.into(),
990            args,
991            replay,
992            prepared_payload,
993        }
994    }
995}
996
997/// One ordered child inside a runtime-prepared tool batch.
998///
999/// The call itself carries the executable provider payload. `replay_suffix`
1000/// is the deterministic suffix used for child effects such as retry sleeps or
1001/// pending completion awaits when the batch is the durable parent.
1002#[derive(Clone, Debug, Serialize, Deserialize)]
1003pub struct PreparedToolBatchCall {
1004    pub call: PreparedToolCall,
1005    pub replay_suffix: String,
1006    #[serde(default, skip_serializing_if = "Option::is_none")]
1007    pub execution_grant: Option<Box<ToolExecutionGrant>>,
1008}
1009
1010/// Runtime-prepared executable tool batch.
1011///
1012/// The vector order is source order. Scheduling may run parallel-safe tools
1013/// concurrently, but launches and pending completion consumption are projected
1014/// back through this order.
1015#[derive(Clone, Debug, Serialize, Deserialize)]
1016pub struct PreparedToolBatch {
1017    pub batch_id: String,
1018    pub calls: Vec<PreparedToolBatchCall>,
1019}
1020
1021impl PreparedToolBatch {
1022    pub fn new(batch_id: impl Into<String>, calls: Vec<PreparedToolCall>) -> Self {
1023        let batch_id = batch_id.into();
1024        let calls = calls
1025            .into_iter()
1026            .enumerate()
1027            .map(|(index, call)| PreparedToolBatchCall {
1028                replay_suffix: format!("child:{index}:{}", call.call_id),
1029                call,
1030                execution_grant: None,
1031            })
1032            .collect();
1033        Self { batch_id, calls }
1034    }
1035
1036    pub fn new_with_grants(
1037        batch_id: impl Into<String>,
1038        calls: Vec<(PreparedToolCall, Option<ToolExecutionGrant>)>,
1039    ) -> Self {
1040        let batch_id = batch_id.into();
1041        let calls = calls
1042            .into_iter()
1043            .enumerate()
1044            .map(|(index, (call, execution_grant))| PreparedToolBatchCall {
1045                replay_suffix: format!("child:{index}:{}", call.call_id),
1046                call,
1047                execution_grant: execution_grant.map(Box::new),
1048            })
1049            .collect();
1050        Self { batch_id, calls }
1051    }
1052
1053    pub fn is_empty(&self) -> bool {
1054        self.calls.is_empty()
1055    }
1056
1057    pub fn len(&self) -> usize {
1058        self.calls.len()
1059    }
1060}
1061
1062/// Explicit authority to execute a tool outside Tool Catalog membership.
1063///
1064/// Normal tool calls are authorized by catalog membership. A grant is a
1065/// separate, caller-provided capability used by deferred resolution flows: it
1066/// carries the manifest/contract to validate the call plus an opaque host
1067/// execution binding that providers can inspect from the prepare and execute
1068/// contexts.
1069#[derive(Clone, Debug, Serialize, Deserialize)]
1070pub struct ToolExecutionGrant {
1071    /// Tool identity and model-facing metadata authorized by the grant.
1072    pub manifest: ToolManifest,
1073    /// Contract used to validate granted call arguments without consulting the
1074    /// current Tool Catalog.
1075    pub contract: Box<ToolContract>,
1076    /// Explicit registry source route for registry-backed execution. Direct
1077    /// non-registry providers may ignore this; [`ToolRegistry`] requires it.
1078    pub source_id: Option<String>,
1079    /// Opaque host routing payload passed to prepare and execute contexts.
1080    pub execution_binding: serde_json::Value,
1081}
1082
1083impl ToolExecutionGrant {
1084    pub fn new(manifest: ToolManifest, contract: ToolContract) -> Self {
1085        Self {
1086            manifest,
1087            contract: Box::new(contract),
1088            source_id: None,
1089            execution_binding: serde_json::Value::Null,
1090        }
1091    }
1092
1093    pub fn from_definition(definition: ToolDefinition) -> Self {
1094        Self::new(definition.manifest(), definition.contract())
1095    }
1096
1097    pub fn with_source_id(mut self, source_id: impl Into<String>) -> Self {
1098        self.source_id = Some(source_id.into());
1099        self
1100    }
1101
1102    pub fn with_execution_binding(mut self, execution_binding: serde_json::Value) -> Self {
1103        self.execution_binding = execution_binding;
1104        self
1105    }
1106}
1107
1108#[derive(Clone)]
1109pub struct ToolPrepareContext {
1110    session_id: String,
1111    sessions: Arc<dyn SessionStateService>,
1112    turn_context: crate::TurnContext,
1113    tool_call_id: Option<String>,
1114    tool_execution_binding: serde_json::Value,
1115}
1116
1117impl ToolPrepareContext {
1118    pub(crate) fn with_execution_binding(
1119        session_id: String,
1120        sessions: Arc<dyn SessionStateService>,
1121        turn_context: crate::TurnContext,
1122        tool_call_id: Option<String>,
1123        tool_execution_binding: serde_json::Value,
1124    ) -> Self {
1125        Self {
1126            session_id,
1127            sessions,
1128            turn_context,
1129            tool_call_id,
1130            tool_execution_binding,
1131        }
1132    }
1133
1134    pub fn session_id(&self) -> &str {
1135        &self.session_id
1136    }
1137
1138    pub fn tool_call_id(&self) -> Option<&str> {
1139        self.tool_call_id.as_deref()
1140    }
1141
1142    pub fn tool_execution_binding(&self) -> &serde_json::Value {
1143        &self.tool_execution_binding
1144    }
1145
1146    pub fn turn_context(&self) -> &crate::TurnContext {
1147        &self.turn_context
1148    }
1149
1150    pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
1151    where
1152        T: 'static,
1153    {
1154        self.turn_context.plugin_input::<T>(plugin_id)
1155    }
1156
1157    pub async fn session_snapshot(&self) -> Result<SessionSnapshot, PluginError> {
1158        self.sessions.snapshot_session(&self.session_id).await
1159    }
1160
1161    pub async fn tool_catalog(&self) -> Result<Vec<serde_json::Value>, PluginError> {
1162        self.sessions.tool_catalog(&self.session_id).await
1163    }
1164
1165    pub async fn shared_tool_catalog(
1166        &self,
1167    ) -> Result<std::sync::Arc<Vec<serde_json::Value>>, PluginError> {
1168        self.sessions.shared_tool_catalog(&self.session_id).await
1169    }
1170}
1171
1172/// Inputs handed to [`ToolProvider::prepare_tool_call`].
1173pub struct ToolPrepareCall<'a> {
1174    pub tool_id: ToolId,
1175    pub pending: crate::sansio::PendingToolCall,
1176    pub context: &'a ToolPrepareContext,
1177}
1178
1179/// Per-call inputs handed to [`ToolProvider::execute`].
1180///
1181/// Fields are `pub` because `ToolCall` is a transient borrow; consumers
1182/// typically destructure (`let ToolCall { name, args, .. } = call`). The
1183/// stable surface lives on [`ToolContext`] (sealed) and the runtime's
1184/// dispatcher, which constructs `ToolCall` values.
1185pub struct ToolCall<'a> {
1186    pub name: &'a str,
1187    pub args: &'a serde_json::Value,
1188    pub context: &'a ToolContext<'a>,
1189    pub progress: Option<&'a ProgressSender>,
1190}
1191
1192/// Trait for providing tools to the sandbox. Implement this per-project.
1193///
1194/// Implementations supply cheap [`ToolManifest`]s, lazily resolved
1195/// [`ToolContract`]s, and a single
1196/// [`execute`](Self::execute) method that handles every call. Tools that
1197/// need session state read it from `call.context`; tools that stream
1198/// progress send through `call.progress`.
1199#[async_trait::async_trait]
1200pub trait ToolProvider: Send + Sync + 'static {
1201    fn tool_manifests(&self) -> Vec<ToolManifest>;
1202    fn resolve_manifest(&self, name: &str) -> Option<ToolManifest> {
1203        self.tool_manifests()
1204            .into_iter()
1205            .find(|manifest| manifest.name == name)
1206    }
1207    fn resolve_manifest_by_id(&self, id: &ToolId) -> Option<ToolManifest> {
1208        self.tool_manifests()
1209            .into_iter()
1210            .find(|manifest| manifest.id == *id)
1211    }
1212    fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>>;
1213    fn resolve_contract_by_id(&self, id: &ToolId) -> Option<Arc<ToolContract>> {
1214        let manifest = self.resolve_manifest_by_id(id)?;
1215        self.resolve_contract(&manifest.name)
1216    }
1217    async fn prepare_tool_call(
1218        &self,
1219        call: ToolPrepareCall<'_>,
1220    ) -> Result<PreparedToolCall, ToolResult> {
1221        Ok(PreparedToolCall::identity(call.tool_id, call.pending))
1222    }
1223    async fn prepare_granted_tool_call(
1224        &self,
1225        grant: &ToolExecutionGrant,
1226        call: ToolPrepareCall<'_>,
1227    ) -> Result<PreparedToolCall, ToolResult> {
1228        let _ = call;
1229        Err(ToolResult::err_fmt(format_args!(
1230            "Granted execution is unsupported for tool id `{}`",
1231            grant.manifest.id
1232        )))
1233    }
1234    async fn execute(&self, call: ToolCall<'_>) -> ToolResult;
1235    async fn execute_granted(
1236        &self,
1237        grant: &ToolExecutionGrant,
1238        args: &serde_json::Value,
1239        context: &ToolContext<'_>,
1240        progress: Option<&ProgressSender>,
1241    ) -> ToolResult {
1242        let _ = (args, context, progress);
1243        ToolResult::err_fmt(format_args!(
1244            "Granted execution is unsupported for tool id `{}`",
1245            grant.manifest.id
1246        ))
1247    }
1248    async fn execute_by_id(
1249        &self,
1250        tool_id: &ToolId,
1251        args: &serde_json::Value,
1252        context: &ToolContext<'_>,
1253        progress: Option<&ProgressSender>,
1254    ) -> ToolResult {
1255        let Some(manifest) = self.resolve_manifest_by_id(tool_id) else {
1256            return ToolResult::err_fmt(format!("Unknown tool id: {tool_id}"));
1257        };
1258        self.execute(ToolCall {
1259            name: &manifest.name,
1260            args,
1261            context,
1262            progress,
1263        })
1264        .await
1265    }
1266}
1267
1268#[cfg(test)]
1269mod tests {
1270    use super::*;
1271    use crate::AwaitEventResolver;
1272    use crate::ProcessRegistry;
1273    use std::sync::atomic::{AtomicU64, Ordering};
1274
1275    struct NoDurableEffectController;
1276
1277    impl crate::AwaitEventResolver for NoDurableEffectController {}
1278
1279    #[async_trait::async_trait]
1280    impl crate::RuntimeEffectController for NoDurableEffectController {
1281        async fn execute_effect(
1282            &self,
1283            _envelope: crate::RuntimeEffectEnvelope,
1284            _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
1285        ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
1286            Err(crate::RuntimeEffectControllerError::new(
1287                "unexpected_effect",
1288                "test controller should not execute effects",
1289            ))
1290        }
1291    }
1292
1293    fn test_context_with_controller(
1294        tool_call_id: Option<String>,
1295        controller: Arc<dyn crate::RuntimeEffectController>,
1296    ) -> ToolContext<'static> {
1297        ToolContext::builder(
1298            "session-1".to_string(),
1299            Arc::new(crate::testing::MockSessionManager::default()),
1300            Arc::new(crate::testing::MockSessionManager::default()),
1301            Arc::new(crate::testing::MockSessionManager::default()),
1302            Arc::new(crate::UnavailableProcessService),
1303            Arc::new(crate::DefaultProcessCancelAbility),
1304            crate::runtime::RuntimeEffectControllerHandle::shared(controller),
1305            Arc::new(crate::InMemoryAttachmentStore::new()),
1306            crate::DirectCompletionClient::unavailable(
1307                "direct completions are unavailable in this test context",
1308            ),
1309        )
1310        .tool_call_id(tool_call_id)
1311        .build()
1312    }
1313
1314    #[test]
1315    fn tool_context_builder_carries_call_payload_and_cancellation_state() {
1316        let cancellation = tokio_util::sync::CancellationToken::new();
1317        let prepared = PreparedToolCall::from_parts(
1318            "call-1",
1319            "tool:demo_tool",
1320            "demo_tool",
1321            serde_json::json!({ "input": true }),
1322            None,
1323            serde_json::json!({ "prepared": true }),
1324        );
1325
1326        let context = ToolContext::builder(
1327            "session-1".to_string(),
1328            Arc::new(crate::testing::MockSessionManager::default()),
1329            Arc::new(crate::testing::MockSessionManager::default()),
1330            Arc::new(crate::testing::MockSessionManager::default()),
1331            Arc::new(crate::UnavailableProcessService),
1332            Arc::new(crate::DefaultProcessCancelAbility),
1333            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1334                crate::InlineRuntimeEffectController,
1335            )),
1336            Arc::new(crate::InMemoryAttachmentStore::new()),
1337            crate::DirectCompletionClient::unavailable(
1338                "direct completions are unavailable in this test context",
1339            ),
1340        )
1341        .prepared_call(&prepared)
1342        .cancellation_token(Some(cancellation.clone()))
1343        .async_process("process-1", cancellation.clone())
1344        .build();
1345
1346        assert_eq!(context.session_id(), "session-1");
1347        assert_eq!(context.tool_call_id(), Some("call-1"));
1348        assert_eq!(
1349            context.prepared_payload(),
1350            &serde_json::json!({ "prepared": true })
1351        );
1352        assert_eq!(context.async_process_id(), Some("process-1"));
1353        assert!(context.cancellation_token().is_some());
1354    }
1355
1356    #[test]
1357    fn durable_effects_requires_prepared_call_id_and_supporting_controller() {
1358        let missing_call =
1359            test_context_with_controller(None, Arc::new(crate::InlineRuntimeEffectController));
1360        let err = match missing_call.durable_effects() {
1361            Ok(_) => panic!("missing prepared tool call id should fail"),
1362            Err(err) => err,
1363        };
1364        assert_eq!(err.code.as_str(), "durable_effects_missing_call_id");
1365
1366        let unsupported = test_context_with_controller(
1367            Some("call-1".to_string()),
1368            Arc::new(NoDurableEffectController),
1369        );
1370        let err = match unsupported.durable_effects() {
1371            Ok(_) => panic!("unsupported controller should fail"),
1372            Err(err) => err,
1373        };
1374        assert_eq!(err.code.as_str(), "durable_effects_unavailable");
1375    }
1376
1377    #[tokio::test]
1378    async fn durable_run_json_executes_and_maps_closure_errors() {
1379        let context = test_context_with_controller(
1380            Some("call-run-json".to_string()),
1381            Arc::new(crate::InlineRuntimeEffectController),
1382        );
1383        let durable = context.durable_effects().expect("durable effects");
1384        let value = durable
1385            .run_json(
1386                "create",
1387                serde_json::json!({ "x": 1 }),
1388                |input| async move { Ok(serde_json::json!({ "seen": input["x"] })) },
1389            )
1390            .await
1391            .expect("durable step");
1392        assert_eq!(value, serde_json::json!({ "seen": 1 }));
1393
1394        let err = durable
1395            .run_json("fail", serde_json::json!({}), |_| async {
1396                Err(crate::RuntimeError::new(
1397                    "durable_step_failed",
1398                    "step failed",
1399                ))
1400            })
1401            .await
1402            .expect_err("closure error");
1403        assert_eq!(err.code.as_str(), "durable_step_failed");
1404        assert_eq!(err.message, "step failed");
1405    }
1406
1407    #[tokio::test]
1408    async fn durable_run_json_rejects_empty_or_duplicate_step_ids_before_running() {
1409        let context = test_context_with_controller(
1410            Some("call-step-ids".to_string()),
1411            Arc::new(crate::InlineRuntimeEffectController),
1412        );
1413        let durable = context.durable_effects().expect("durable effects");
1414        let runs = Arc::new(AtomicU64::new(0));
1415
1416        let err = durable
1417            .run_json("", serde_json::Value::Null, {
1418                let runs = Arc::clone(&runs);
1419                move |_| async move {
1420                    runs.fetch_add(1, Ordering::Relaxed);
1421                    Ok(serde_json::Value::Null)
1422                }
1423            })
1424            .await
1425            .expect_err("empty step id");
1426        assert_eq!(err.code.as_str(), "durable_effect_empty_step_id");
1427        assert_eq!(runs.load(Ordering::Relaxed), 0);
1428
1429        durable
1430            .run_json("same", serde_json::Value::Null, {
1431                let runs = Arc::clone(&runs);
1432                move |_| async move {
1433                    runs.fetch_add(1, Ordering::Relaxed);
1434                    Ok(serde_json::Value::Null)
1435                }
1436            })
1437            .await
1438            .expect("first step");
1439        let err = durable
1440            .run_json("same", serde_json::Value::Null, {
1441                let runs = Arc::clone(&runs);
1442                move |_| async move {
1443                    runs.fetch_add(1, Ordering::Relaxed);
1444                    Ok(serde_json::Value::Null)
1445                }
1446            })
1447            .await
1448            .expect_err("duplicate step id");
1449        assert_eq!(err.code.as_str(), "durable_effect_duplicate_step_id");
1450        assert_eq!(runs.load(Ordering::Relaxed), 1);
1451    }
1452
1453    #[tokio::test]
1454    async fn durable_external_event_key_is_custom_and_stable() {
1455        let context = test_context_with_controller(
1456            Some("call-event-key".to_string()),
1457            Arc::new(crate::InlineRuntimeEffectController),
1458        );
1459        let durable = context.durable_effects().expect("durable effects");
1460        let first = durable
1461            .external_event_key("tool-event-stable")
1462            .await
1463            .expect("first key");
1464        let second = durable
1465            .external_event_key("tool-event-stable")
1466            .await
1467            .expect("second key");
1468
1469        assert_eq!(first, second);
1470        assert_eq!(
1471            first.wait,
1472            crate::AwaitEventWaitIdentity::Custom {
1473                key: "tool-event-stable".to_string()
1474            }
1475        );
1476    }
1477
1478    #[tokio::test]
1479    async fn durable_await_event_json_maps_terminal_resolutions() {
1480        let controller = Arc::new(crate::InlineRuntimeEffectController);
1481        let context =
1482            test_context_with_controller(Some("call-await-event".to_string()), controller.clone());
1483        let durable = context.durable_effects().expect("durable effects");
1484
1485        let ok_key = durable
1486            .external_event_key("tool-event-ok")
1487            .await
1488            .expect("ok key");
1489        controller
1490            .resolve_await_event(
1491                &ok_key,
1492                crate::Resolution::Ok(serde_json::json!({ "answer": 42 })),
1493            )
1494            .await
1495            .expect("resolve ok");
1496        let value = durable
1497            .await_event_json(ok_key)
1498            .await
1499            .expect("await ok value");
1500        assert_eq!(value, serde_json::json!({ "answer": 42 }));
1501
1502        let err_key = durable
1503            .external_event_key("tool-event-err")
1504            .await
1505            .expect("err key");
1506        controller
1507            .resolve_await_event(
1508                &err_key,
1509                crate::Resolution::Err(crate::ExternalCompletionError::new(
1510                    "external_bad",
1511                    "external failed",
1512                )),
1513            )
1514            .await
1515            .expect("resolve err");
1516        let err = durable
1517            .await_event_json(err_key)
1518            .await
1519            .expect_err("await err value");
1520        assert_eq!(err.code.as_str(), "external_bad");
1521
1522        let cancelled_key = durable
1523            .external_event_key("tool-event-cancelled")
1524            .await
1525            .expect("cancelled key");
1526        controller
1527            .resolve_await_event(&cancelled_key, crate::Resolution::Cancelled)
1528            .await
1529            .expect("resolve cancelled");
1530        let err = durable
1531            .await_event_json(cancelled_key)
1532            .await
1533            .expect_err("await cancelled value");
1534        assert_eq!(err.code.as_str(), "durable_effect_event_cancelled");
1535
1536        let timeout_key = durable
1537            .external_event_key("tool-event-timeout")
1538            .await
1539            .expect("timeout key");
1540        controller
1541            .resolve_await_event(&timeout_key, crate::Resolution::Timeout)
1542            .await
1543            .expect("resolve timeout");
1544        let err = durable
1545            .await_event_json(timeout_key)
1546            .await
1547            .expect_err("await timeout value");
1548        assert_eq!(err.code.as_str(), "durable_effect_event_timeout");
1549    }
1550
1551    #[tokio::test]
1552    async fn durable_emit_process_event_requires_process_and_appends_inside_process() {
1553        let context = test_context_with_controller(
1554            Some("call-no-process".to_string()),
1555            Arc::new(crate::InlineRuntimeEffectController),
1556        );
1557        let err = context
1558            .durable_effects()
1559            .expect("durable effects")
1560            .emit_process_event("tool.event", serde_json::json!({}))
1561            .await
1562            .expect_err("outside process");
1563        assert_eq!(
1564            err.code.as_str(),
1565            "durable_effect_process_event_unavailable"
1566        );
1567
1568        let registry = Arc::new(crate::TestLocalProcessRegistry::default());
1569        let process_id = "process:durable-tool-event";
1570        registry
1571            .register_process(
1572                crate::ProcessRegistration::new(
1573                    process_id,
1574                    crate::ProcessInput::External {
1575                        metadata: serde_json::json!({}),
1576                    },
1577                    crate::RecoveryDisposition::ExternallyOwned,
1578                    crate::ProcessProvenance::host(),
1579                )
1580                .with_extra_event_types([crate::ProcessEventType {
1581                    name: "tool.event".to_string(),
1582                    payload_schema: crate::LashSchema::any(),
1583                    semantics: crate::ProcessEventSemanticsSpec::default(),
1584                }]),
1585            )
1586            .await
1587            .expect("register process");
1588        let registry_dyn: Arc<dyn crate::ProcessRegistry> = registry;
1589        let context = test_context_with_controller(
1590            Some("call-process-event".to_string()),
1591            Arc::new(crate::InlineRuntimeEffectController),
1592        )
1593        .with_process_events_for_testing(process_id, registry_dyn);
1594
1595        let event = context
1596            .durable_effects()
1597            .expect("durable effects")
1598            .emit_process_event("tool.event", serde_json::json!({ "ok": true }))
1599            .await
1600            .expect("process event");
1601        assert_eq!(event.process_id, process_id);
1602        assert_eq!(event.event_type, "tool.event");
1603        assert_eq!(event.payload, serde_json::json!({ "ok": true }));
1604        assert_eq!(
1605            event.invocation.replay_key(),
1606            Some("tool:call-process-event:durable-process-event:0")
1607        );
1608
1609        let append_err = context
1610            .durable_effects()
1611            .expect("durable effects")
1612            .emit_process_event("undeclared.event", serde_json::json!({}))
1613            .await
1614            .expect_err("undeclared event type must fail the append");
1615        assert_eq!(
1616            append_err.code.as_str(),
1617            "durable_effect_process_event_append_failed"
1618        );
1619    }
1620}