Skip to main content

lash_core/
tool_provider.rs

1use std::collections::HashSet;
2use std::future::Future;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use lash_sansio::llm::types::ProviderReplayMeta;
7use serde::{Deserialize, Serialize};
8
9use crate::plugin::{
10    PluginError, SessionGraphService, SessionLifecycleService, SessionSnapshot, SessionStateService,
11};
12use crate::{AttachmentStore, ToolContract, ToolDefinition, ToolId, ToolManifest, ToolResult};
13
14mod attachments;
15mod direct_completion;
16mod dispatch;
17mod process;
18pub(crate) mod process_events;
19mod session;
20mod triggers;
21
22pub use attachments::ToolAttachmentClient;
23pub use direct_completion::ToolDirectCompletionClient;
24pub use dispatch::ToolDispatchClient;
25pub use process::ToolSessionProcessAdmin;
26pub use process_events::ToolProcessEventClient;
27pub use session::{ToolSessionAdmin, ToolSessionModel};
28pub use triggers::ToolTriggerClient;
29
30/// A message sent from the sandbox to the host during execution.
31#[derive(Clone, Debug)]
32pub struct SandboxMessage {
33    pub text: String,
34    /// "tool_output" or another host-rendered progress event kind.
35    pub kind: String,
36}
37
38/// Sender for streaming progress messages from tools (e.g. live bash output).
39pub type ProgressSender = tokio::sync::mpsc::UnboundedSender<SandboxMessage>;
40
41#[derive(Clone, Default)]
42pub(crate) struct ToolCompletionState {
43    key: Arc<Mutex<Option<crate::AwaitEventKey>>>,
44}
45
46impl ToolCompletionState {
47    fn store(
48        &self,
49        key: crate::AwaitEventKey,
50    ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
51        let mut guard = self.key.lock().map_err(|_| {
52            crate::RuntimeError::new(
53                "tool_completion_state_poisoned",
54                "tool completion key state lock poisoned",
55            )
56        })?;
57        if let Some(existing) = guard.as_ref() {
58            return Ok(existing.clone());
59        }
60        *guard = Some(key.clone());
61        Ok(key)
62    }
63
64    pub(crate) fn take(&self) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
65        self.key.lock().map(|mut guard| guard.take()).map_err(|_| {
66            crate::RuntimeError::new(
67                "tool_completion_state_poisoned",
68                "tool completion key state lock poisoned",
69            )
70        })
71    }
72}
73
74#[derive(Clone, Default)]
75pub(crate) struct ToolDurableEffectState {
76    step_ids: Arc<Mutex<HashSet<String>>>,
77    process_event_sequence: Arc<AtomicU64>,
78}
79
80impl ToolDurableEffectState {
81    fn reserve_step(&self, step_id: &str) -> Result<(), crate::RuntimeError> {
82        let mut guard = self.step_ids.lock().map_err(|_| {
83            crate::RuntimeError::new(
84                "durable_effect_state_poisoned",
85                "durable effect step state lock poisoned",
86            )
87        })?;
88        if !guard.insert(step_id.to_string()) {
89            return Err(crate::RuntimeError::new(
90                "durable_effect_duplicate_step_id",
91                format!("durable effect step id `{step_id}` was already used by this tool call"),
92            ));
93        }
94        Ok(())
95    }
96
97    fn next_process_event_sequence(&self) -> u64 {
98        self.process_event_sequence.fetch_add(1, Ordering::Relaxed)
99    }
100}
101
102/// Per-call environment for [`ToolProvider::execute`]. Fields are sealed so
103/// the runtime can add capabilities without breaking tool authors.
104#[derive(Clone)]
105pub struct ToolContext<'run> {
106    pub(crate) session_id: String,
107    pub(crate) agent_frame_id: crate::AgentFrameId,
108    pub(crate) sessions: Arc<dyn SessionStateService>,
109    pub(crate) session_lifecycle: Arc<dyn SessionLifecycleService>,
110    pub(crate) processes: Arc<dyn crate::ProcessService>,
111    pub(crate) process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
112    pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
113    pub(crate) runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
114    pub(crate) runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
115    pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
116    pub(crate) async_process_id: Option<String>,
117    pub(crate) runtime_process_id: Option<String>,
118    pub(crate) process_events: Option<ToolProcessEventContext>,
119    pub(crate) attachment_store: Arc<dyn AttachmentStore>,
120    pub(crate) direct_completions: crate::DirectCompletionClient<'run>,
121    pub(crate) prepared_payload: serde_json::Value,
122    pub(crate) tool_execution_binding: serde_json::Value,
123    /// The id of the in-flight tool call that is invoking this tool.
124    pub(crate) tool_call_id: Option<String>,
125    pub(crate) attempt_number: u32,
126    pub(crate) max_attempts: u32,
127    pub(crate) replay_key: Option<String>,
128    pub(crate) completion: ToolCompletionState,
129    pub(crate) durable_effects: ToolDurableEffectState,
130    pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
131    pub(crate) execution_env_spec: crate::ProcessExecutionEnvSpec,
132    pub(crate) child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
133}
134
135#[derive(Clone)]
136pub struct ToolChildProcessStarted {
137    pub process_id: String,
138    pub child_entry_name: Option<String>,
139}
140
141#[derive(Clone)]
142pub struct ToolChildExecutionTraceHook {
143    on_child_process_started: Arc<dyn Fn(ToolChildProcessStarted) + Send + Sync>,
144}
145
146impl ToolChildExecutionTraceHook {
147    pub fn new(
148        on_child_process_started: impl Fn(ToolChildProcessStarted) + Send + Sync + 'static,
149    ) -> Self {
150        Self {
151            on_child_process_started: Arc::new(on_child_process_started),
152        }
153    }
154
155    pub fn child_process_started(&self, event: ToolChildProcessStarted) {
156        (self.on_child_process_started)(event);
157    }
158}
159
160#[derive(Clone)]
161pub(crate) struct ToolProcessEventContext {
162    process_id: String,
163    registry: Arc<dyn crate::ProcessRegistry>,
164    store: Option<Arc<dyn crate::RuntimePersistence>>,
165    session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
166    session_graph: Arc<dyn SessionGraphService>,
167    queued_work_driver: Option<crate::QueuedWorkDriver>,
168}
169
170pub(crate) struct ToolContextBuilder<'run> {
171    session_id: String,
172    agent_frame_id: crate::AgentFrameId,
173    sessions: Arc<dyn SessionStateService>,
174    session_lifecycle: Arc<dyn SessionLifecycleService>,
175    session_graph: Arc<dyn SessionGraphService>,
176    processes: Arc<dyn crate::ProcessService>,
177    process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
178    effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
179    runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
180    runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
181    cancellation_token: Option<tokio_util::sync::CancellationToken>,
182    async_process_id: Option<String>,
183    runtime_process_id: Option<String>,
184    process_events: Option<ToolProcessEventContext>,
185    attachment_store: Arc<dyn AttachmentStore>,
186    direct_completions: crate::DirectCompletionClient<'run>,
187    prepared_payload: serde_json::Value,
188    tool_execution_binding: serde_json::Value,
189    tool_call_id: Option<String>,
190    completion: ToolCompletionState,
191    durable_effects: ToolDurableEffectState,
192    parent_invocation: Option<crate::RuntimeInvocation>,
193    execution_env_spec: crate::ProcessExecutionEnvSpec,
194    child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
195}
196
197impl<'run> ToolContextBuilder<'run> {
198    pub(crate) fn from_dispatch(
199        dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
200    ) -> Self {
201        Self {
202            session_id: dispatch.session_id.clone(),
203            agent_frame_id: dispatch.agent_frame_id.clone(),
204            sessions: Arc::clone(&dispatch.sessions),
205            session_lifecycle: Arc::clone(&dispatch.session_lifecycle),
206            session_graph: Arc::clone(&dispatch.session_graph),
207            processes: Arc::clone(&dispatch.processes),
208            process_cancel_ability: Arc::clone(&dispatch.process_cancel_ability),
209            effect_controller: dispatch.effect_controller.clone(),
210            runtime_dispatch: Some(Arc::clone(&dispatch)),
211            runtime_execution_context: None,
212            cancellation_token: None,
213            async_process_id: None,
214            runtime_process_id: None,
215            process_events: None,
216            attachment_store: Arc::clone(&dispatch.attachment_store),
217            direct_completions: dispatch.direct_completions.clone(),
218            prepared_payload: serde_json::Value::Null,
219            tool_execution_binding: serde_json::Value::Null,
220            tool_call_id: None,
221            completion: ToolCompletionState::default(),
222            durable_effects: ToolDurableEffectState::default(),
223            parent_invocation: dispatch.parent_invocation.clone(),
224            execution_env_spec: dispatch.execution_env_spec.clone(),
225            child_execution_trace_hook: None,
226        }
227    }
228
229    #[cfg(any(test, feature = "testing"))]
230    pub(crate) fn tool_call_id(mut self, tool_call_id: impl Into<Option<String>>) -> Self {
231        self.tool_call_id = tool_call_id.into();
232        self
233    }
234
235    pub(crate) fn prepared_call(mut self, call: &PreparedToolCall) -> Self {
236        self.tool_call_id = Some(call.call_id.clone());
237        self.prepared_payload = call.prepared_payload.clone();
238        self
239    }
240
241    pub(crate) fn tool_execution_binding(mut self, binding: serde_json::Value) -> Self {
242        self.tool_execution_binding = binding;
243        self
244    }
245
246    pub(crate) fn cancellation_token(
247        mut self,
248        cancellation_token: Option<tokio_util::sync::CancellationToken>,
249    ) -> Self {
250        self.cancellation_token = cancellation_token;
251        self
252    }
253
254    pub(crate) fn runtime_execution_context(
255        mut self,
256        context: crate::RuntimeExecutionContext<'run>,
257    ) -> Self {
258        self.runtime_execution_context = Some(context);
259        self
260    }
261
262    pub(crate) fn runtime_process_id(mut self, process_id: Option<String>) -> Self {
263        self.runtime_process_id = process_id;
264        self
265    }
266
267    pub(crate) fn async_process(
268        mut self,
269        process_id: impl Into<String>,
270        cancellation_token: tokio_util::sync::CancellationToken,
271    ) -> Self {
272        self.async_process_id = Some(process_id.into());
273        self.cancellation_token = Some(cancellation_token);
274        self
275    }
276
277    pub(crate) fn process_events(
278        mut self,
279        process_id: impl Into<String>,
280        registry: Arc<dyn crate::ProcessRegistry>,
281        store: Option<Arc<dyn crate::RuntimePersistence>>,
282        session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
283        queued_work_driver: Option<crate::QueuedWorkDriver>,
284    ) -> Self {
285        self.process_events = Some(ToolProcessEventContext {
286            process_id: process_id.into(),
287            registry,
288            store,
289            session_store_factory,
290            session_graph: Arc::clone(&self.session_graph),
291            queued_work_driver,
292        });
293        self
294    }
295
296    pub(crate) fn parent_invocation(mut self, metadata: Option<crate::RuntimeInvocation>) -> Self {
297        self.parent_invocation = metadata;
298        self
299    }
300
301    pub(crate) fn child_execution_trace_hook(
302        mut self,
303        hook: Option<ToolChildExecutionTraceHook>,
304    ) -> Self {
305        self.child_execution_trace_hook = hook;
306        self
307    }
308
309    pub(crate) fn build(self) -> ToolContext<'run> {
310        ToolContext {
311            session_id: self.session_id,
312            agent_frame_id: self.agent_frame_id,
313            sessions: self.sessions,
314            session_lifecycle: self.session_lifecycle,
315            processes: self.processes,
316            process_cancel_ability: self.process_cancel_ability,
317            effect_controller: self.effect_controller,
318            runtime_dispatch: self.runtime_dispatch,
319            runtime_execution_context: self.runtime_execution_context,
320            cancellation_token: self.cancellation_token,
321            async_process_id: self.async_process_id,
322            runtime_process_id: self.runtime_process_id,
323            process_events: self.process_events,
324            attachment_store: self.attachment_store,
325            direct_completions: self.direct_completions,
326            prepared_payload: self.prepared_payload,
327            tool_execution_binding: self.tool_execution_binding,
328            tool_call_id: self.tool_call_id,
329            attempt_number: 1,
330            max_attempts: 1,
331            replay_key: None,
332            completion: self.completion,
333            durable_effects: self.durable_effects,
334            parent_invocation: self.parent_invocation,
335            execution_env_spec: self.execution_env_spec,
336            child_execution_trace_hook: self.child_execution_trace_hook,
337        }
338    }
339}
340
341impl<'run> ToolContext<'run> {
342    #[cfg(any(test, feature = "testing"))]
343    #[expect(
344        clippy::too_many_arguments,
345        reason = "testing constructor mirrors the sealed runtime tool context dependencies"
346    )]
347    pub(crate) fn builder(
348        session_id: String,
349        sessions: Arc<dyn SessionStateService>,
350        session_lifecycle: Arc<dyn SessionLifecycleService>,
351        session_graph: Arc<dyn SessionGraphService>,
352        processes: Arc<dyn crate::ProcessService>,
353        process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
354        effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
355        attachment_store: Arc<dyn AttachmentStore>,
356        direct_completions: crate::DirectCompletionClient<'run>,
357    ) -> ToolContextBuilder<'run> {
358        ToolContextBuilder {
359            session_id,
360            agent_frame_id: String::new(),
361            sessions,
362            session_lifecycle,
363            session_graph,
364            processes,
365            process_cancel_ability,
366            effect_controller,
367            runtime_dispatch: None,
368            runtime_execution_context: None,
369            cancellation_token: None,
370            async_process_id: None,
371            runtime_process_id: None,
372            process_events: None,
373            attachment_store,
374            direct_completions,
375            prepared_payload: serde_json::Value::Null,
376            tool_execution_binding: serde_json::Value::Null,
377            tool_call_id: None,
378            completion: ToolCompletionState::default(),
379            durable_effects: ToolDurableEffectState::default(),
380            parent_invocation: None,
381            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
382                crate::PluginOptions::default(),
383                crate::SessionPolicy::default(),
384            ),
385            child_execution_trace_hook: None,
386        }
387    }
388
389    pub(crate) fn from_dispatch(
390        dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
391    ) -> ToolContextBuilder<'run> {
392        ToolContextBuilder::from_dispatch(dispatch)
393    }
394
395    pub fn session_id(&self) -> &str {
396        &self.session_id
397    }
398
399    pub fn agent_frame_id(&self) -> &str {
400        &self.agent_frame_id
401    }
402
403    pub fn sessions(&self) -> ToolSessionAdmin<'run> {
404        ToolSessionAdmin {
405            session_id: self.session_id.clone(),
406            sessions: Arc::clone(&self.sessions),
407            session_lifecycle: Arc::clone(&self.session_lifecycle),
408            effect_controller: self.effect_controller.clone(),
409        }
410    }
411
412    pub fn dispatch(&self) -> ToolDispatchClient<'run> {
413        ToolDispatchClient {
414            context: self.clone(),
415        }
416    }
417
418    pub fn triggers(&self) -> ToolTriggerClient<'run> {
419        ToolTriggerClient {
420            context: self.clone(),
421        }
422    }
423
424    pub fn processes(&self) -> ToolSessionProcessAdmin<'run> {
425        ToolSessionProcessAdmin {
426            session_id: self.session_id.clone(),
427            agent_frame_id: self.agent_frame_id.clone(),
428            processes: Arc::clone(&self.processes),
429            process_cancel_ability: Arc::clone(&self.process_cancel_ability),
430            effect_controller: self.effect_controller.clone(),
431            parent_invocation: self.parent_invocation.clone(),
432            tool_call_id: self.tool_call_id.clone(),
433            execution_env_spec: self.execution_env_spec.clone(),
434        }
435    }
436
437    pub fn emit_child_process_started(
438        &self,
439        process_id: impl Into<String>,
440        child_entry_name: Option<String>,
441    ) {
442        let Some(hook) = &self.child_execution_trace_hook else {
443            return;
444        };
445        hook.child_process_started(ToolChildProcessStarted {
446            process_id: process_id.into(),
447            child_entry_name,
448        });
449    }
450
451    pub fn direct_completions(&self) -> ToolDirectCompletionClient<'run> {
452        ToolDirectCompletionClient {
453            session_id: self.session_id.clone(),
454            tool_call_id: self.tool_call_id.clone(),
455            direct_completions: self.direct_completions.clone(),
456            parent_invocation: self.parent_invocation.clone(),
457            parent_tool_attempt_is_durable: self.parent_invocation.as_ref().is_some_and(
458                |invocation| {
459                    invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolAttempt)
460                },
461            ) && self
462                .effect_controller
463                .controller()
464                .durability_tier()
465                == crate::DurabilityTier::Durable,
466        }
467    }
468
469    pub fn attachments(&self) -> ToolAttachmentClient {
470        ToolAttachmentClient {
471            store: Arc::clone(&self.attachment_store),
472        }
473    }
474
475    pub fn process_events(&self) -> ToolProcessEventClient {
476        ToolProcessEventClient {
477            context: self.process_events.clone(),
478        }
479    }
480
481    /// Borrow this tool call's durable effect boundary.
482    ///
483    /// This is available only while executing a prepared tool call under a
484    /// controller that explicitly supports durable tool effects. The returned
485    /// facade records JSON steps and await-event waits in the caller's existing
486    /// effect log; it does not expose the underlying workflow engine context.
487    pub fn durable_effects(&self) -> Result<ToolDurableEffects<'_, 'run>, crate::RuntimeError> {
488        let Some(tool_call_id) = self.tool_call_id.as_deref() else {
489            return Err(crate::RuntimeError::new(
490                "durable_effects_missing_call_id",
491                "durable effects require a prepared tool call id",
492            ));
493        };
494        if tool_call_id.trim().is_empty() {
495            return Err(crate::RuntimeError::new(
496                "durable_effects_missing_call_id",
497                "durable effects require a non-empty prepared tool call id",
498            ));
499        }
500        let scoped = self.effect_controller.scoped();
501        if !scoped.controller().supports_durable_effects() {
502            return Err(crate::RuntimeError::new(
503                "durable_effects_unavailable",
504                "this effect controller does not support durable tool effects",
505            ));
506        }
507        if self.parent_invocation.as_ref().is_some_and(|invocation| {
508            invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolAttempt)
509        }) && scoped.controller().durability_tier() == crate::DurabilityTier::Durable
510        {
511            return Err(crate::RuntimeError::new(
512                "durable_effects_unavailable_in_tool_attempt",
513                "durable tool sub-effects are not available inside a journaled tool attempt",
514            ));
515        }
516        Ok(ToolDurableEffects { context: self })
517    }
518
519    pub fn cancellation_token(&self) -> Option<&tokio_util::sync::CancellationToken> {
520        self.cancellation_token.as_ref()
521    }
522
523    pub fn async_process_id(&self) -> Option<&str> {
524        self.async_process_id.as_deref()
525    }
526
527    pub fn runtime_process_id(&self) -> Option<&str> {
528        self.async_process_id
529            .as_deref()
530            .or(self.runtime_process_id.as_deref())
531            .or_else(|| {
532                self.process_events
533                    .as_ref()
534                    .map(|context| context.process_id.as_str())
535            })
536    }
537
538    pub fn tool_call_id(&self) -> Option<&str> {
539        self.tool_call_id.as_deref()
540    }
541
542    pub fn prepared_payload(&self) -> &serde_json::Value {
543        &self.prepared_payload
544    }
545
546    pub fn tool_execution_binding(&self) -> &serde_json::Value {
547        &self.tool_execution_binding
548    }
549
550    pub fn decode_prepared_payload<T>(&self) -> Result<T, serde_json::Error>
551    where
552        T: serde::de::DeserializeOwned,
553    {
554        serde_json::from_value(self.prepared_payload.clone())
555    }
556
557    pub fn attempt_number(&self) -> u32 {
558        self.attempt_number
559    }
560
561    pub fn max_attempts(&self) -> u32 {
562        self.max_attempts
563    }
564
565    pub fn replay_key(&self) -> Option<&str> {
566        self.replay_key.as_deref()
567    }
568
569    /// Obtain the durable completion key for this call, required before returning
570    /// [`ToolResult::Pending`](crate::ToolResult::Pending).
571    ///
572    /// A tool that defers its outcome (waiting on a webhook, human approval, or another
573    /// service) calls this, hands the returned [`AwaitEventKey`](crate::AwaitEventKey)
574    /// to whatever will complete the work out-of-band, and then returns
575    /// `ToolResult::Pending(..)`. The key names the durable wait the runtime parks the
576    /// call on; the external resolver delivers the result against it later.
577    ///
578    /// The key is stored on the context and consumed by the dispatcher when the tool
579    /// returns `Pending`. Returning `Pending` without first calling this fails the call
580    /// with `pending_tool_missing_completion_key`. Calls made outside a prepared tool
581    /// invocation (no tool call id) fail with `tool_completion_key_missing_call_id`.
582    pub async fn completion_key(&self) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
583        let tool_call_id = self.tool_call_id.clone().ok_or_else(|| {
584            crate::RuntimeError::new(
585                "tool_completion_key_missing_call_id",
586                "completion keys require a prepared tool call id",
587            )
588        })?;
589        let scoped = self.effect_controller.scoped();
590        let key = scoped
591            .controller()
592            .await_event_key(
593                scoped.execution_scope(),
594                crate::AwaitEventWaitIdentity::tool_completion(tool_call_id),
595            )
596            .await?;
597        self.completion.store(key)
598    }
599
600    pub(crate) fn take_completion_key(
601        &self,
602    ) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
603        self.completion.take()
604    }
605
606    pub fn with_async_process(
607        mut self,
608        process_id: impl Into<String>,
609        cancellation_token: tokio_util::sync::CancellationToken,
610    ) -> Self {
611        self.async_process_id = Some(process_id.into());
612        self.runtime_process_id = self.async_process_id.clone();
613        self.cancellation_token = Some(cancellation_token);
614        self
615    }
616
617    #[cfg(any(test, feature = "testing"))]
618    #[doc(hidden)]
619    pub fn with_process_events_for_testing(
620        mut self,
621        process_id: impl Into<String>,
622        registry: Arc<dyn crate::ProcessRegistry>,
623    ) -> Self {
624        self.process_events = Some(ToolProcessEventContext {
625            process_id: process_id.into(),
626            registry,
627            store: None,
628            session_store_factory: None,
629            session_graph: Arc::new(crate::plugin::NoopSessionManager),
630            queued_work_driver: None,
631        });
632        self
633    }
634
635    pub(crate) fn with_retry_context(
636        mut self,
637        tool_name: &str,
638        attempt_number: u32,
639        max_attempts: u32,
640    ) -> Self {
641        self.attempt_number = attempt_number.max(1);
642        self.max_attempts = max_attempts.max(1);
643        self.replay_key = self
644            .tool_call_id
645            .as_ref()
646            .map(|call_id| format!("lash-tool:{}:{call_id}:{tool_name}", self.session_id));
647        self
648    }
649
650    pub(crate) fn with_prepared_payload(mut self, payload: serde_json::Value) -> Self {
651        self.prepared_payload = payload;
652        self
653    }
654
655    pub(crate) fn with_tool_execution_binding(mut self, binding: serde_json::Value) -> Self {
656        self.tool_execution_binding = binding;
657        self
658    }
659
660    /// Constructor reserved for `lash_core::testing` helpers. Do not call directly;
661    /// use [`lash_core::testing::mock_tool_context`] instead.
662    #[cfg(any(test, feature = "testing"))]
663    #[doc(hidden)]
664    #[expect(
665        clippy::too_many_arguments,
666        reason = "test-only constructor mirrors the sealed runtime tool context"
667    )]
668    pub fn __for_testing(
669        session_id: String,
670        sessions: Arc<dyn SessionStateService>,
671        session_lifecycle: Arc<dyn SessionLifecycleService>,
672        session_graph: Arc<dyn SessionGraphService>,
673        processes: Arc<dyn crate::ProcessService>,
674        attachment_store: Arc<dyn AttachmentStore>,
675        direct_completions: crate::DirectCompletionClient<'static>,
676        tool_call_id: Option<String>,
677    ) -> ToolContext<'static> {
678        ToolContext::builder(
679            session_id,
680            sessions,
681            session_lifecycle,
682            session_graph,
683            processes,
684            Arc::new(crate::DefaultProcessCancelAbility),
685            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
686                crate::InlineRuntimeEffectController,
687            )),
688            attachment_store,
689            direct_completions,
690        )
691        .tool_call_id(tool_call_id)
692        .build()
693    }
694
695    /// Constructor reserved for tests that need a custom process-cancel host
696    /// ability. Do not call directly; prefer public testing helpers when they
697    /// cover the case.
698    #[cfg(any(test, feature = "testing"))]
699    #[doc(hidden)]
700    #[expect(
701        clippy::too_many_arguments,
702        reason = "test-only constructor mirrors the sealed runtime context"
703    )]
704    pub fn __for_testing_with_process_cancel_ability(
705        session_id: String,
706        sessions: Arc<dyn SessionStateService>,
707        session_lifecycle: Arc<dyn SessionLifecycleService>,
708        session_graph: Arc<dyn SessionGraphService>,
709        processes: Arc<dyn crate::ProcessService>,
710        process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
711        attachment_store: Arc<dyn AttachmentStore>,
712        direct_completions: crate::DirectCompletionClient<'static>,
713        tool_call_id: Option<String>,
714    ) -> ToolContext<'static> {
715        ToolContext::builder(
716            session_id,
717            sessions,
718            session_lifecycle,
719            session_graph,
720            processes,
721            process_cancel_ability,
722            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
723                crate::InlineRuntimeEffectController,
724            )),
725            attachment_store,
726            direct_completions,
727        )
728        .tool_call_id(tool_call_id)
729        .build()
730    }
731}
732
733/// Durable effect operations available to advanced in-process tools.
734///
735/// The facade borrows the caller's existing runtime effect boundary. It records
736/// JSON-only local steps and awaits host-signed event keys without exposing
737/// Restate, Temporal, or any other workflow-native context.
738pub struct ToolDurableEffects<'ctx, 'run> {
739    context: &'ctx ToolContext<'run>,
740}
741
742impl<'ctx, 'run> ToolDurableEffects<'ctx, 'run> {
743    pub async fn run_json<F, Fut>(
744        &self,
745        step_id: impl Into<String>,
746        input: serde_json::Value,
747        run: F,
748    ) -> Result<serde_json::Value, crate::RuntimeError>
749    where
750        F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
751        Fut: Future<Output = Result<serde_json::Value, crate::RuntimeError>> + Send + 'run,
752    {
753        let step_id = step_id.into();
754        if step_id.trim().is_empty() {
755            return Err(crate::RuntimeError::new(
756                "durable_effect_empty_step_id",
757                "durable effect step id must be non-empty",
758            ));
759        }
760        self.context.durable_effects.reserve_step(&step_id)?;
761        let invocation = self.step_invocation(
762            format!("durable-step:{step_id}"),
763            crate::RuntimeEffectKind::DurableStep,
764            format!("durable-step:{step_id}"),
765        )?;
766        let outcome = self
767            .context
768            .effect_controller
769            .controller()
770            .execute_effect(
771                crate::RuntimeEffectEnvelope::new(
772                    invocation,
773                    crate::RuntimeEffectCommand::DurableStep {
774                        step_id,
775                        input: input.clone(),
776                    },
777                ),
778                crate::RuntimeEffectLocalExecutor::durable_step(run),
779            )
780            .await
781            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
782        outcome
783            .into_durable_step()
784            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)
785    }
786
787    pub async fn external_event_key(
788        &self,
789        key: impl Into<String>,
790    ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
791        let key = key.into();
792        if key.trim().is_empty() {
793            return Err(crate::RuntimeError::new(
794                "durable_effect_empty_event_key",
795                "durable effect external event key must be non-empty",
796            ));
797        }
798        let scoped = self.context.effect_controller.scoped();
799        scoped
800            .controller()
801            .await_event_key(
802                scoped.execution_scope(),
803                crate::AwaitEventWaitIdentity::Custom { key },
804            )
805            .await
806    }
807
808    pub async fn await_event_json(
809        &self,
810        key: crate::AwaitEventKey,
811    ) -> Result<serde_json::Value, crate::RuntimeError> {
812        let invocation = self.step_invocation(
813            format!("await-event:{}", key.key_id),
814            crate::RuntimeEffectKind::AwaitEvent,
815            format!("await-event:{}", key.key_id),
816        )?;
817        let cancellation = self.context.cancellation_token.clone().unwrap_or_default();
818        let clock = self
819            .context
820            .runtime_dispatch
821            .as_ref()
822            .map(|dispatch| Arc::clone(&dispatch.clock))
823            .unwrap_or_else(|| Arc::new(crate::SystemClock));
824        let outcome = self
825            .context
826            .effect_controller
827            .controller()
828            .execute_effect(
829                crate::RuntimeEffectEnvelope::new(
830                    invocation,
831                    crate::RuntimeEffectCommand::AwaitEvent { key },
832                ),
833                crate::RuntimeEffectLocalExecutor::await_event_with_clock(
834                    cancellation,
835                    None,
836                    clock,
837                ),
838            )
839            .await
840            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
841        match outcome
842            .into_await_event()
843            .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?
844        {
845            crate::Resolution::Ok(value) => Ok(value),
846            crate::Resolution::Err(err) => Err(crate::RuntimeError::new(err.code, err.message)),
847            crate::Resolution::Timeout => Err(crate::RuntimeError::new(
848                "durable_effect_event_timeout",
849                "durable effect external event wait timed out",
850            )),
851            crate::Resolution::Cancelled => Err(crate::RuntimeError::new(
852                "durable_effect_event_cancelled",
853                "durable effect external event wait was cancelled",
854            )),
855        }
856    }
857
858    pub async fn emit_process_event(
859        &self,
860        event_type: impl Into<String>,
861        payload: serde_json::Value,
862    ) -> Result<crate::ProcessEvent, crate::RuntimeError> {
863        let Some(process) = self.context.process_events.as_ref() else {
864            return Err(crate::RuntimeError::new(
865                "durable_effect_process_event_unavailable",
866                "durable effect process events are unavailable outside a durable process",
867            ));
868        };
869        let event_type = event_type.into();
870        if event_type.trim().is_empty() {
871            return Err(crate::RuntimeError::new(
872                "durable_effect_empty_process_event_type",
873                "durable effect process event type must be non-empty",
874            ));
875        }
876        let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
877            crate::RuntimeError::new(
878                "durable_effects_missing_call_id",
879                "durable effects require a prepared tool call id",
880            )
881        })?;
882        let sequence = self.context.durable_effects.next_process_event_sequence();
883        let request = crate::ProcessEventAppendRequest::new(event_type, payload).with_replay_key(
884            format!("tool:{tool_call_id}:durable-process-event:{sequence}"),
885        );
886        self.context
887            .process_events()
888            .emit_request(request)
889            .await
890            .map_err(|err| {
891                crate::RuntimeError::new(
892                    "durable_effect_process_event_append_failed",
893                    err.to_string(),
894                )
895            })
896            .and_then(|event| {
897                if event.process_id == process.process_id {
898                    Ok(event)
899                } else {
900                    Err(crate::RuntimeError::new(
901                        "durable_effect_process_event_process_mismatch",
902                        "process event append returned an event for a different process",
903                    ))
904                }
905            })
906    }
907
908    fn step_invocation(
909        &self,
910        effect_id_suffix: impl Into<String>,
911        kind: crate::RuntimeEffectKind,
912        replay_suffix: impl AsRef<str>,
913    ) -> Result<crate::RuntimeInvocation, crate::RuntimeError> {
914        let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
915            crate::RuntimeError::new(
916                "durable_effects_missing_call_id",
917                "durable effects require a prepared tool call id",
918            )
919        })?;
920        let effect_id_suffix = effect_id_suffix.into();
921        if let Some(parent) = self.context.parent_invocation.as_ref() {
922            return Ok(crate::runtime::causal::child_effect_invocation(
923                parent,
924                format!("{tool_call_id}:{effect_id_suffix}"),
925                kind,
926                replay_suffix,
927            ));
928        }
929        let scoped = self.context.effect_controller.scoped();
930        let replay_key = format!(
931            "{}:tool:{tool_call_id}:{}",
932            scoped.scope_id(),
933            replay_suffix.as_ref()
934        );
935        Ok(crate::RuntimeInvocation::effect(
936            crate::RuntimeScope::new(self.context.session_id.clone()),
937            format!("{tool_call_id}:{effect_id_suffix}"),
938            kind,
939            replay_key,
940        ))
941    }
942}
943
944/// Runtime-prepared executable tool call.
945///
946/// The raw model/provider identity remains visible, but any argument rewrites
947/// and provider-owned context projections are frozen before the call crosses a
948/// runtime effect or process boundary.
949#[derive(Clone, Debug, Serialize, Deserialize)]
950pub struct PreparedToolCall {
951    pub call_id: String,
952    pub tool_id: ToolId,
953    pub tool_name: String,
954    pub args: serde_json::Value,
955    #[serde(default, skip_serializing_if = "Option::is_none")]
956    pub replay: Option<ProviderReplayMeta>,
957    #[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
958    pub prepared_payload: serde_json::Value,
959}
960
961impl PreparedToolCall {
962    pub fn identity(tool_id: ToolId, call: crate::sansio::PendingToolCall) -> Self {
963        Self {
964            call_id: call.call_id,
965            tool_id,
966            tool_name: call.tool_name,
967            args: call.args,
968            replay: call.replay,
969            prepared_payload: serde_json::Value::Null,
970        }
971    }
972
973    pub fn from_parts(
974        call_id: impl Into<String>,
975        tool_id: impl Into<ToolId>,
976        tool_name: impl Into<String>,
977        args: serde_json::Value,
978        replay: Option<ProviderReplayMeta>,
979        prepared_payload: serde_json::Value,
980    ) -> Self {
981        Self {
982            call_id: call_id.into(),
983            tool_id: tool_id.into(),
984            tool_name: tool_name.into(),
985            args,
986            replay,
987            prepared_payload,
988        }
989    }
990}
991
992/// One ordered child inside a runtime-prepared tool batch.
993///
994/// The call itself carries the executable provider payload. `replay_suffix`
995/// is the deterministic suffix used for child effects such as retry sleeps or
996/// pending completion awaits when the batch is the durable parent.
997#[derive(Clone, Debug, Serialize, Deserialize)]
998pub struct PreparedToolBatchCall {
999    pub call: PreparedToolCall,
1000    pub replay_suffix: String,
1001    #[serde(default, skip_serializing_if = "Option::is_none")]
1002    pub execution_grant: Option<Box<ToolExecutionGrant>>,
1003}
1004
1005/// Runtime-prepared executable tool batch.
1006///
1007/// The vector order is source order. Scheduling may run parallel-safe tools
1008/// concurrently, but launches and pending completion consumption are projected
1009/// back through this order.
1010#[derive(Clone, Debug, Serialize, Deserialize)]
1011pub struct PreparedToolBatch {
1012    pub batch_id: String,
1013    pub calls: Vec<PreparedToolBatchCall>,
1014}
1015
1016impl PreparedToolBatch {
1017    pub fn new(batch_id: impl Into<String>, calls: Vec<PreparedToolCall>) -> Self {
1018        let batch_id = batch_id.into();
1019        let calls = calls
1020            .into_iter()
1021            .enumerate()
1022            .map(|(index, call)| PreparedToolBatchCall {
1023                replay_suffix: format!("child:{index}:{}", call.call_id),
1024                call,
1025                execution_grant: None,
1026            })
1027            .collect();
1028        Self { batch_id, calls }
1029    }
1030
1031    pub fn new_with_grants(
1032        batch_id: impl Into<String>,
1033        calls: Vec<(PreparedToolCall, Option<ToolExecutionGrant>)>,
1034    ) -> Self {
1035        let batch_id = batch_id.into();
1036        let calls = calls
1037            .into_iter()
1038            .enumerate()
1039            .map(|(index, (call, execution_grant))| PreparedToolBatchCall {
1040                replay_suffix: format!("child:{index}:{}", call.call_id),
1041                call,
1042                execution_grant: execution_grant.map(Box::new),
1043            })
1044            .collect();
1045        Self { batch_id, calls }
1046    }
1047
1048    pub fn is_empty(&self) -> bool {
1049        self.calls.is_empty()
1050    }
1051
1052    pub fn len(&self) -> usize {
1053        self.calls.len()
1054    }
1055}
1056
1057/// Explicit authority to execute a tool outside Tool Catalog membership.
1058///
1059/// Normal tool calls are authorized by catalog membership. A grant is a
1060/// separate, caller-provided capability used by deferred resolution flows: it
1061/// carries the manifest/contract to validate the call plus an opaque host
1062/// execution binding that providers can inspect from the prepare and execute
1063/// contexts.
1064#[derive(Clone, Debug, Serialize, Deserialize)]
1065pub struct ToolExecutionGrant {
1066    /// Tool identity and model-facing metadata authorized by the grant.
1067    pub manifest: ToolManifest,
1068    /// Contract used to validate granted call arguments without consulting the
1069    /// current Tool Catalog.
1070    pub contract: Box<ToolContract>,
1071    /// Explicit registry source route for registry-backed execution. Direct
1072    /// non-registry providers may ignore this; [`ToolRegistry`] requires it.
1073    pub source_id: Option<String>,
1074    /// Opaque host routing payload passed to prepare and execute contexts.
1075    pub execution_binding: serde_json::Value,
1076}
1077
1078impl ToolExecutionGrant {
1079    pub fn new(manifest: ToolManifest, contract: ToolContract) -> Self {
1080        Self {
1081            manifest,
1082            contract: Box::new(contract),
1083            source_id: None,
1084            execution_binding: serde_json::Value::Null,
1085        }
1086    }
1087
1088    pub fn from_definition(definition: ToolDefinition) -> Self {
1089        Self::new(definition.manifest(), definition.contract())
1090    }
1091
1092    pub fn with_source_id(mut self, source_id: impl Into<String>) -> Self {
1093        self.source_id = Some(source_id.into());
1094        self
1095    }
1096
1097    pub fn with_execution_binding(mut self, execution_binding: serde_json::Value) -> Self {
1098        self.execution_binding = execution_binding;
1099        self
1100    }
1101}
1102
1103#[derive(Clone)]
1104pub struct ToolPrepareContext {
1105    session_id: String,
1106    sessions: Arc<dyn SessionStateService>,
1107    turn_context: crate::TurnContext,
1108    tool_call_id: Option<String>,
1109    tool_execution_binding: serde_json::Value,
1110}
1111
1112impl ToolPrepareContext {
1113    pub(crate) fn with_execution_binding(
1114        session_id: String,
1115        sessions: Arc<dyn SessionStateService>,
1116        turn_context: crate::TurnContext,
1117        tool_call_id: Option<String>,
1118        tool_execution_binding: serde_json::Value,
1119    ) -> Self {
1120        Self {
1121            session_id,
1122            sessions,
1123            turn_context,
1124            tool_call_id,
1125            tool_execution_binding,
1126        }
1127    }
1128
1129    pub fn session_id(&self) -> &str {
1130        &self.session_id
1131    }
1132
1133    pub fn tool_call_id(&self) -> Option<&str> {
1134        self.tool_call_id.as_deref()
1135    }
1136
1137    pub fn tool_execution_binding(&self) -> &serde_json::Value {
1138        &self.tool_execution_binding
1139    }
1140
1141    pub fn turn_context(&self) -> &crate::TurnContext {
1142        &self.turn_context
1143    }
1144
1145    pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
1146    where
1147        T: 'static,
1148    {
1149        self.turn_context.plugin_input::<T>(plugin_id)
1150    }
1151
1152    pub async fn session_snapshot(&self) -> Result<SessionSnapshot, PluginError> {
1153        self.sessions.snapshot_session(&self.session_id).await
1154    }
1155
1156    pub async fn tool_catalog(&self) -> Result<Vec<serde_json::Value>, PluginError> {
1157        self.sessions.tool_catalog(&self.session_id).await
1158    }
1159
1160    pub async fn shared_tool_catalog(
1161        &self,
1162    ) -> Result<std::sync::Arc<Vec<serde_json::Value>>, PluginError> {
1163        self.sessions.shared_tool_catalog(&self.session_id).await
1164    }
1165}
1166
1167/// Inputs handed to [`ToolProvider::prepare_tool_call`].
1168pub struct ToolPrepareCall<'a> {
1169    pub tool_id: ToolId,
1170    pub pending: crate::sansio::PendingToolCall,
1171    pub context: &'a ToolPrepareContext,
1172}
1173
1174/// Per-call inputs handed to [`ToolProvider::execute`].
1175///
1176/// Fields are `pub` because `ToolCall` is a transient borrow; consumers
1177/// typically destructure (`let ToolCall { name, args, .. } = call`). The
1178/// stable surface lives on [`ToolContext`] (sealed) and the runtime's
1179/// dispatcher, which constructs `ToolCall` values.
1180pub struct ToolCall<'a> {
1181    pub name: &'a str,
1182    pub args: &'a serde_json::Value,
1183    pub context: &'a ToolContext<'a>,
1184    pub progress: Option<&'a ProgressSender>,
1185}
1186
1187/// Trait for providing tools to the sandbox. Implement this per-project.
1188///
1189/// Implementations supply cheap [`ToolManifest`]s, lazily resolved
1190/// [`ToolContract`]s, and a single
1191/// [`execute`](Self::execute) method that handles every call. Tools that
1192/// need session state read it from `call.context`; tools that stream
1193/// progress send through `call.progress`.
1194#[async_trait::async_trait]
1195pub trait ToolProvider: Send + Sync + 'static {
1196    fn tool_manifests(&self) -> Vec<ToolManifest>;
1197    fn resolve_manifest(&self, name: &str) -> Option<ToolManifest> {
1198        self.tool_manifests()
1199            .into_iter()
1200            .find(|manifest| manifest.name == name)
1201    }
1202    fn resolve_manifest_by_id(&self, id: &ToolId) -> Option<ToolManifest> {
1203        self.tool_manifests()
1204            .into_iter()
1205            .find(|manifest| manifest.id == *id)
1206    }
1207    fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>>;
1208    fn resolve_contract_by_id(&self, id: &ToolId) -> Option<Arc<ToolContract>> {
1209        let manifest = self.resolve_manifest_by_id(id)?;
1210        self.resolve_contract(&manifest.name)
1211    }
1212    async fn prepare_tool_call(
1213        &self,
1214        call: ToolPrepareCall<'_>,
1215    ) -> Result<PreparedToolCall, ToolResult> {
1216        Ok(PreparedToolCall::identity(call.tool_id, call.pending))
1217    }
1218    async fn prepare_granted_tool_call(
1219        &self,
1220        grant: &ToolExecutionGrant,
1221        call: ToolPrepareCall<'_>,
1222    ) -> Result<PreparedToolCall, ToolResult> {
1223        let _ = call;
1224        Err(ToolResult::err_fmt(format_args!(
1225            "Granted execution is unsupported for tool id `{}`",
1226            grant.manifest.id
1227        )))
1228    }
1229    async fn execute(&self, call: ToolCall<'_>) -> ToolResult;
1230    async fn execute_granted(
1231        &self,
1232        grant: &ToolExecutionGrant,
1233        args: &serde_json::Value,
1234        context: &ToolContext<'_>,
1235        progress: Option<&ProgressSender>,
1236    ) -> ToolResult {
1237        let _ = (args, context, progress);
1238        ToolResult::err_fmt(format_args!(
1239            "Granted execution is unsupported for tool id `{}`",
1240            grant.manifest.id
1241        ))
1242    }
1243    async fn execute_by_id(
1244        &self,
1245        tool_id: &ToolId,
1246        args: &serde_json::Value,
1247        context: &ToolContext<'_>,
1248        progress: Option<&ProgressSender>,
1249    ) -> ToolResult {
1250        let Some(manifest) = self.resolve_manifest_by_id(tool_id) else {
1251            return ToolResult::err_fmt(format!("Unknown tool id: {tool_id}"));
1252        };
1253        self.execute(ToolCall {
1254            name: &manifest.name,
1255            args,
1256            context,
1257            progress,
1258        })
1259        .await
1260    }
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265    use super::*;
1266    use crate::ProcessRegistry;
1267    use crate::RuntimeEffectController;
1268    use std::sync::atomic::{AtomicU64, Ordering};
1269
1270    struct NoDurableEffectController;
1271
1272    #[async_trait::async_trait]
1273    impl crate::RuntimeEffectController for NoDurableEffectController {
1274        async fn execute_effect(
1275            &self,
1276            _envelope: crate::RuntimeEffectEnvelope,
1277            _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
1278        ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
1279            Err(crate::RuntimeEffectControllerError::new(
1280                "unexpected_effect",
1281                "test controller should not execute effects",
1282            ))
1283        }
1284    }
1285
1286    fn test_context_with_controller(
1287        tool_call_id: Option<String>,
1288        controller: Arc<dyn crate::RuntimeEffectController>,
1289    ) -> ToolContext<'static> {
1290        ToolContext::builder(
1291            "session-1".to_string(),
1292            Arc::new(crate::testing::MockSessionManager::default()),
1293            Arc::new(crate::testing::MockSessionManager::default()),
1294            Arc::new(crate::testing::MockSessionManager::default()),
1295            Arc::new(crate::UnavailableProcessService),
1296            Arc::new(crate::DefaultProcessCancelAbility),
1297            crate::runtime::RuntimeEffectControllerHandle::shared(controller),
1298            Arc::new(crate::InMemoryAttachmentStore::new()),
1299            crate::DirectCompletionClient::unavailable(
1300                "direct completions are unavailable in this test context",
1301            ),
1302        )
1303        .tool_call_id(tool_call_id)
1304        .build()
1305    }
1306
1307    #[test]
1308    fn tool_context_builder_carries_call_payload_and_cancellation_state() {
1309        let cancellation = tokio_util::sync::CancellationToken::new();
1310        let prepared = PreparedToolCall::from_parts(
1311            "call-1",
1312            "tool:demo_tool",
1313            "demo_tool",
1314            serde_json::json!({ "input": true }),
1315            None,
1316            serde_json::json!({ "prepared": true }),
1317        );
1318
1319        let context = ToolContext::builder(
1320            "session-1".to_string(),
1321            Arc::new(crate::testing::MockSessionManager::default()),
1322            Arc::new(crate::testing::MockSessionManager::default()),
1323            Arc::new(crate::testing::MockSessionManager::default()),
1324            Arc::new(crate::UnavailableProcessService),
1325            Arc::new(crate::DefaultProcessCancelAbility),
1326            crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1327                crate::InlineRuntimeEffectController,
1328            )),
1329            Arc::new(crate::InMemoryAttachmentStore::new()),
1330            crate::DirectCompletionClient::unavailable(
1331                "direct completions are unavailable in this test context",
1332            ),
1333        )
1334        .prepared_call(&prepared)
1335        .cancellation_token(Some(cancellation.clone()))
1336        .async_process("process-1", cancellation.clone())
1337        .build();
1338
1339        assert_eq!(context.session_id(), "session-1");
1340        assert_eq!(context.tool_call_id(), Some("call-1"));
1341        assert_eq!(
1342            context.prepared_payload(),
1343            &serde_json::json!({ "prepared": true })
1344        );
1345        assert_eq!(context.async_process_id(), Some("process-1"));
1346        assert!(context.cancellation_token().is_some());
1347    }
1348
1349    #[test]
1350    fn durable_effects_requires_prepared_call_id_and_supporting_controller() {
1351        let missing_call =
1352            test_context_with_controller(None, Arc::new(crate::InlineRuntimeEffectController));
1353        let err = match missing_call.durable_effects() {
1354            Ok(_) => panic!("missing prepared tool call id should fail"),
1355            Err(err) => err,
1356        };
1357        assert_eq!(err.code.as_str(), "durable_effects_missing_call_id");
1358
1359        let unsupported = test_context_with_controller(
1360            Some("call-1".to_string()),
1361            Arc::new(NoDurableEffectController),
1362        );
1363        let err = match unsupported.durable_effects() {
1364            Ok(_) => panic!("unsupported controller should fail"),
1365            Err(err) => err,
1366        };
1367        assert_eq!(err.code.as_str(), "durable_effects_unavailable");
1368    }
1369
1370    #[tokio::test]
1371    async fn durable_run_json_executes_and_maps_closure_errors() {
1372        let context = test_context_with_controller(
1373            Some("call-run-json".to_string()),
1374            Arc::new(crate::InlineRuntimeEffectController),
1375        );
1376        let durable = context.durable_effects().expect("durable effects");
1377        let value = durable
1378            .run_json(
1379                "create",
1380                serde_json::json!({ "x": 1 }),
1381                |input| async move { Ok(serde_json::json!({ "seen": input["x"] })) },
1382            )
1383            .await
1384            .expect("durable step");
1385        assert_eq!(value, serde_json::json!({ "seen": 1 }));
1386
1387        let err = durable
1388            .run_json("fail", serde_json::json!({}), |_| async {
1389                Err(crate::RuntimeError::new(
1390                    "durable_step_failed",
1391                    "step failed",
1392                ))
1393            })
1394            .await
1395            .expect_err("closure error");
1396        assert_eq!(err.code.as_str(), "durable_step_failed");
1397        assert_eq!(err.message, "step failed");
1398    }
1399
1400    #[tokio::test]
1401    async fn durable_run_json_rejects_empty_or_duplicate_step_ids_before_running() {
1402        let context = test_context_with_controller(
1403            Some("call-step-ids".to_string()),
1404            Arc::new(crate::InlineRuntimeEffectController),
1405        );
1406        let durable = context.durable_effects().expect("durable effects");
1407        let runs = Arc::new(AtomicU64::new(0));
1408
1409        let err = durable
1410            .run_json("", serde_json::Value::Null, {
1411                let runs = Arc::clone(&runs);
1412                move |_| async move {
1413                    runs.fetch_add(1, Ordering::Relaxed);
1414                    Ok(serde_json::Value::Null)
1415                }
1416            })
1417            .await
1418            .expect_err("empty step id");
1419        assert_eq!(err.code.as_str(), "durable_effect_empty_step_id");
1420        assert_eq!(runs.load(Ordering::Relaxed), 0);
1421
1422        durable
1423            .run_json("same", serde_json::Value::Null, {
1424                let runs = Arc::clone(&runs);
1425                move |_| async move {
1426                    runs.fetch_add(1, Ordering::Relaxed);
1427                    Ok(serde_json::Value::Null)
1428                }
1429            })
1430            .await
1431            .expect("first step");
1432        let err = durable
1433            .run_json("same", serde_json::Value::Null, {
1434                let runs = Arc::clone(&runs);
1435                move |_| async move {
1436                    runs.fetch_add(1, Ordering::Relaxed);
1437                    Ok(serde_json::Value::Null)
1438                }
1439            })
1440            .await
1441            .expect_err("duplicate step id");
1442        assert_eq!(err.code.as_str(), "durable_effect_duplicate_step_id");
1443        assert_eq!(runs.load(Ordering::Relaxed), 1);
1444    }
1445
1446    #[tokio::test]
1447    async fn durable_external_event_key_is_custom_and_stable() {
1448        let context = test_context_with_controller(
1449            Some("call-event-key".to_string()),
1450            Arc::new(crate::InlineRuntimeEffectController),
1451        );
1452        let durable = context.durable_effects().expect("durable effects");
1453        let first = durable
1454            .external_event_key("tool-event-stable")
1455            .await
1456            .expect("first key");
1457        let second = durable
1458            .external_event_key("tool-event-stable")
1459            .await
1460            .expect("second key");
1461
1462        assert_eq!(first, second);
1463        assert_eq!(
1464            first.wait,
1465            crate::AwaitEventWaitIdentity::Custom {
1466                key: "tool-event-stable".to_string()
1467            }
1468        );
1469    }
1470
1471    #[tokio::test]
1472    async fn durable_await_event_json_maps_terminal_resolutions() {
1473        let controller = Arc::new(crate::InlineRuntimeEffectController);
1474        let context =
1475            test_context_with_controller(Some("call-await-event".to_string()), controller.clone());
1476        let durable = context.durable_effects().expect("durable effects");
1477
1478        let ok_key = durable
1479            .external_event_key("tool-event-ok")
1480            .await
1481            .expect("ok key");
1482        controller
1483            .resolve_await_event(
1484                &ok_key,
1485                crate::Resolution::Ok(serde_json::json!({ "answer": 42 })),
1486            )
1487            .await
1488            .expect("resolve ok");
1489        let value = durable
1490            .await_event_json(ok_key)
1491            .await
1492            .expect("await ok value");
1493        assert_eq!(value, serde_json::json!({ "answer": 42 }));
1494
1495        let err_key = durable
1496            .external_event_key("tool-event-err")
1497            .await
1498            .expect("err key");
1499        controller
1500            .resolve_await_event(
1501                &err_key,
1502                crate::Resolution::Err(crate::ExternalCompletionError::new(
1503                    "external_bad",
1504                    "external failed",
1505                )),
1506            )
1507            .await
1508            .expect("resolve err");
1509        let err = durable
1510            .await_event_json(err_key)
1511            .await
1512            .expect_err("await err value");
1513        assert_eq!(err.code.as_str(), "external_bad");
1514
1515        let cancelled_key = durable
1516            .external_event_key("tool-event-cancelled")
1517            .await
1518            .expect("cancelled key");
1519        controller
1520            .resolve_await_event(&cancelled_key, crate::Resolution::Cancelled)
1521            .await
1522            .expect("resolve cancelled");
1523        let err = durable
1524            .await_event_json(cancelled_key)
1525            .await
1526            .expect_err("await cancelled value");
1527        assert_eq!(err.code.as_str(), "durable_effect_event_cancelled");
1528
1529        let timeout_key = durable
1530            .external_event_key("tool-event-timeout")
1531            .await
1532            .expect("timeout key");
1533        controller
1534            .resolve_await_event(&timeout_key, crate::Resolution::Timeout)
1535            .await
1536            .expect("resolve timeout");
1537        let err = durable
1538            .await_event_json(timeout_key)
1539            .await
1540            .expect_err("await timeout value");
1541        assert_eq!(err.code.as_str(), "durable_effect_event_timeout");
1542    }
1543
1544    #[tokio::test]
1545    async fn durable_emit_process_event_requires_process_and_appends_inside_process() {
1546        let context = test_context_with_controller(
1547            Some("call-no-process".to_string()),
1548            Arc::new(crate::InlineRuntimeEffectController),
1549        );
1550        let err = context
1551            .durable_effects()
1552            .expect("durable effects")
1553            .emit_process_event("tool.event", serde_json::json!({}))
1554            .await
1555            .expect_err("outside process");
1556        assert_eq!(
1557            err.code.as_str(),
1558            "durable_effect_process_event_unavailable"
1559        );
1560
1561        let registry = Arc::new(crate::TestLocalProcessRegistry::default());
1562        let process_id = "process:durable-tool-event";
1563        registry
1564            .register_process(
1565                crate::ProcessRegistration::new(
1566                    process_id,
1567                    crate::ProcessInput::External {
1568                        metadata: serde_json::json!({}),
1569                    },
1570                    crate::ProcessProvenance::host(),
1571                )
1572                .with_extra_event_types([crate::ProcessEventType {
1573                    name: "tool.event".to_string(),
1574                    payload_schema: crate::LashSchema::any(),
1575                    semantics: crate::ProcessEventSemanticsSpec::default(),
1576                }]),
1577            )
1578            .await
1579            .expect("register process");
1580        let registry_dyn: Arc<dyn crate::ProcessRegistry> = registry;
1581        let context = test_context_with_controller(
1582            Some("call-process-event".to_string()),
1583            Arc::new(crate::InlineRuntimeEffectController),
1584        )
1585        .with_process_events_for_testing(process_id, registry_dyn);
1586
1587        let event = context
1588            .durable_effects()
1589            .expect("durable effects")
1590            .emit_process_event("tool.event", serde_json::json!({ "ok": true }))
1591            .await
1592            .expect("process event");
1593        assert_eq!(event.process_id, process_id);
1594        assert_eq!(event.event_type, "tool.event");
1595        assert_eq!(event.payload, serde_json::json!({ "ok": true }));
1596        assert_eq!(
1597            event.invocation.replay_key(),
1598            Some("tool:call-process-event:durable-process-event:0")
1599        );
1600
1601        let append_err = context
1602            .durable_effects()
1603            .expect("durable effects")
1604            .emit_process_event("undeclared.event", serde_json::json!({}))
1605            .await
1606            .expect_err("undeclared event type must fail the append");
1607        assert_eq!(
1608            append_err.code.as_str(),
1609            "durable_effect_process_event_append_failed"
1610        );
1611    }
1612}