1use std::collections::HashSet;
2use std::future::Future;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use lash_sansio::llm::types::ProviderReplayMeta;
7use serde::{Deserialize, Serialize};
8
9use crate::plugin::{
10 PluginError, SessionGraphService, SessionLifecycleService, SessionSnapshot, SessionStateService,
11};
12use crate::{AttachmentStore, ToolContract, ToolId, ToolManifest, ToolResult};
13
14mod attachments;
15mod direct_completion;
16mod dispatch;
17mod process;
18pub(crate) mod process_events;
19mod session;
20mod triggers;
21
22pub use attachments::ToolAttachmentClient;
23pub use direct_completion::ToolDirectCompletionClient;
24pub use dispatch::ToolDispatchClient;
25pub use process::ToolSessionProcessAdmin;
26pub use process_events::ToolProcessEventClient;
27pub use session::{ToolSessionAdmin, ToolSessionModel};
28pub use triggers::ToolTriggerClient;
29
30#[derive(Clone, Debug)]
32pub struct SandboxMessage {
33 pub text: String,
34 pub kind: String,
36}
37
38pub type ProgressSender = tokio::sync::mpsc::UnboundedSender<SandboxMessage>;
40
41#[derive(Clone, Default)]
42pub(crate) struct ToolCompletionState {
43 key: Arc<Mutex<Option<crate::AwaitEventKey>>>,
44}
45
46impl ToolCompletionState {
47 fn store(
48 &self,
49 key: crate::AwaitEventKey,
50 ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
51 let mut guard = self.key.lock().map_err(|_| {
52 crate::RuntimeError::new(
53 "tool_completion_state_poisoned",
54 "tool completion key state lock poisoned",
55 )
56 })?;
57 if let Some(existing) = guard.as_ref() {
58 return Ok(existing.clone());
59 }
60 *guard = Some(key.clone());
61 Ok(key)
62 }
63
64 pub(crate) fn take(&self) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
65 self.key.lock().map(|mut guard| guard.take()).map_err(|_| {
66 crate::RuntimeError::new(
67 "tool_completion_state_poisoned",
68 "tool completion key state lock poisoned",
69 )
70 })
71 }
72}
73
74#[derive(Clone, Default)]
75pub(crate) struct ToolDurableEffectState {
76 step_ids: Arc<Mutex<HashSet<String>>>,
77 process_event_sequence: Arc<AtomicU64>,
78}
79
80impl ToolDurableEffectState {
81 fn reserve_step(&self, step_id: &str) -> Result<(), crate::RuntimeError> {
82 let mut guard = self.step_ids.lock().map_err(|_| {
83 crate::RuntimeError::new(
84 "durable_effect_state_poisoned",
85 "durable effect step state lock poisoned",
86 )
87 })?;
88 if !guard.insert(step_id.to_string()) {
89 return Err(crate::RuntimeError::new(
90 "durable_effect_duplicate_step_id",
91 format!("durable effect step id `{step_id}` was already used by this tool call"),
92 ));
93 }
94 Ok(())
95 }
96
97 fn next_process_event_sequence(&self) -> u64 {
98 self.process_event_sequence.fetch_add(1, Ordering::Relaxed)
99 }
100}
101
102#[derive(Clone)]
105pub struct ToolContext<'run> {
106 pub(crate) session_id: String,
107 pub(crate) agent_frame_id: crate::AgentFrameId,
108 pub(crate) sessions: Arc<dyn SessionStateService>,
109 pub(crate) session_lifecycle: Arc<dyn SessionLifecycleService>,
110 pub(crate) processes: Arc<dyn crate::ProcessService>,
111 pub(crate) process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
112 pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
113 pub(crate) runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
114 pub(crate) runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
115 pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
116 pub(crate) async_process_id: Option<String>,
117 pub(crate) runtime_process_id: Option<String>,
118 pub(crate) process_events: Option<ToolProcessEventContext>,
119 pub(crate) attachment_store: Arc<dyn AttachmentStore>,
120 pub(crate) direct_completions: crate::DirectCompletionClient<'run>,
121 pub(crate) prepared_payload: serde_json::Value,
122 pub(crate) tool_call_id: Option<String>,
124 pub(crate) attempt_number: u32,
125 pub(crate) max_attempts: u32,
126 pub(crate) replay_key: Option<String>,
127 pub(crate) completion: ToolCompletionState,
128 pub(crate) durable_effects: ToolDurableEffectState,
129 pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
130 pub(crate) execution_env_spec: crate::ProcessExecutionEnvSpec,
131 pub(crate) child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
132}
133
134#[derive(Clone)]
135pub struct ToolChildProcessStarted {
136 pub process_id: String,
137 pub child_entry_name: Option<String>,
138}
139
140#[derive(Clone)]
141pub struct ToolChildExecutionTraceHook {
142 on_child_process_started: Arc<dyn Fn(ToolChildProcessStarted) + Send + Sync>,
143}
144
145impl ToolChildExecutionTraceHook {
146 pub fn new(
147 on_child_process_started: impl Fn(ToolChildProcessStarted) + Send + Sync + 'static,
148 ) -> Self {
149 Self {
150 on_child_process_started: Arc::new(on_child_process_started),
151 }
152 }
153
154 pub fn child_process_started(&self, event: ToolChildProcessStarted) {
155 (self.on_child_process_started)(event);
156 }
157}
158
159#[derive(Clone)]
160pub(crate) struct ToolProcessEventContext {
161 process_id: String,
162 registry: Arc<dyn crate::ProcessRegistry>,
163 store: Option<Arc<dyn crate::RuntimePersistence>>,
164 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
165 session_graph: Arc<dyn SessionGraphService>,
166 queued_work_driver: Option<crate::QueuedWorkDriver>,
167}
168
169pub(crate) struct ToolContextBuilder<'run> {
170 session_id: String,
171 agent_frame_id: crate::AgentFrameId,
172 sessions: Arc<dyn SessionStateService>,
173 session_lifecycle: Arc<dyn SessionLifecycleService>,
174 session_graph: Arc<dyn SessionGraphService>,
175 processes: Arc<dyn crate::ProcessService>,
176 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
177 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
178 runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
179 runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
180 cancellation_token: Option<tokio_util::sync::CancellationToken>,
181 async_process_id: Option<String>,
182 runtime_process_id: Option<String>,
183 process_events: Option<ToolProcessEventContext>,
184 attachment_store: Arc<dyn AttachmentStore>,
185 direct_completions: crate::DirectCompletionClient<'run>,
186 prepared_payload: serde_json::Value,
187 tool_call_id: Option<String>,
188 completion: ToolCompletionState,
189 durable_effects: ToolDurableEffectState,
190 parent_invocation: Option<crate::RuntimeInvocation>,
191 execution_env_spec: crate::ProcessExecutionEnvSpec,
192 child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
193}
194
195impl<'run> ToolContextBuilder<'run> {
196 pub(crate) fn from_dispatch(
197 dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
198 ) -> Self {
199 Self {
200 session_id: dispatch.session_id.clone(),
201 agent_frame_id: dispatch.agent_frame_id.clone(),
202 sessions: Arc::clone(&dispatch.sessions),
203 session_lifecycle: Arc::clone(&dispatch.session_lifecycle),
204 session_graph: Arc::clone(&dispatch.session_graph),
205 processes: Arc::clone(&dispatch.processes),
206 process_cancel_ability: Arc::clone(&dispatch.process_cancel_ability),
207 effect_controller: dispatch.effect_controller.clone(),
208 runtime_dispatch: Some(Arc::clone(&dispatch)),
209 runtime_execution_context: None,
210 cancellation_token: None,
211 async_process_id: None,
212 runtime_process_id: None,
213 process_events: None,
214 attachment_store: Arc::clone(&dispatch.attachment_store),
215 direct_completions: dispatch.direct_completions.clone(),
216 prepared_payload: serde_json::Value::Null,
217 tool_call_id: None,
218 completion: ToolCompletionState::default(),
219 durable_effects: ToolDurableEffectState::default(),
220 parent_invocation: dispatch.parent_invocation.clone(),
221 execution_env_spec: dispatch.execution_env_spec.clone(),
222 child_execution_trace_hook: None,
223 }
224 }
225
226 #[cfg(any(test, feature = "testing"))]
227 pub(crate) fn tool_call_id(mut self, tool_call_id: impl Into<Option<String>>) -> Self {
228 self.tool_call_id = tool_call_id.into();
229 self
230 }
231
232 pub(crate) fn prepared_call(mut self, call: &PreparedToolCall) -> Self {
233 self.tool_call_id = Some(call.call_id.clone());
234 self.prepared_payload = call.prepared_payload.clone();
235 self
236 }
237
238 pub(crate) fn cancellation_token(
239 mut self,
240 cancellation_token: Option<tokio_util::sync::CancellationToken>,
241 ) -> Self {
242 self.cancellation_token = cancellation_token;
243 self
244 }
245
246 pub(crate) fn runtime_execution_context(
247 mut self,
248 context: crate::RuntimeExecutionContext<'run>,
249 ) -> Self {
250 self.runtime_execution_context = Some(context);
251 self
252 }
253
254 pub(crate) fn runtime_process_id(mut self, process_id: Option<String>) -> Self {
255 self.runtime_process_id = process_id;
256 self
257 }
258
259 pub(crate) fn async_process(
260 mut self,
261 process_id: impl Into<String>,
262 cancellation_token: tokio_util::sync::CancellationToken,
263 ) -> Self {
264 self.async_process_id = Some(process_id.into());
265 self.cancellation_token = Some(cancellation_token);
266 self
267 }
268
269 pub(crate) fn process_events(
270 mut self,
271 process_id: impl Into<String>,
272 registry: Arc<dyn crate::ProcessRegistry>,
273 store: Option<Arc<dyn crate::RuntimePersistence>>,
274 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
275 queued_work_driver: Option<crate::QueuedWorkDriver>,
276 ) -> Self {
277 self.process_events = Some(ToolProcessEventContext {
278 process_id: process_id.into(),
279 registry,
280 store,
281 session_store_factory,
282 session_graph: Arc::clone(&self.session_graph),
283 queued_work_driver,
284 });
285 self
286 }
287
288 pub(crate) fn parent_invocation(mut self, metadata: Option<crate::RuntimeInvocation>) -> Self {
289 self.parent_invocation = metadata;
290 self
291 }
292
293 pub(crate) fn child_execution_trace_hook(
294 mut self,
295 hook: Option<ToolChildExecutionTraceHook>,
296 ) -> Self {
297 self.child_execution_trace_hook = hook;
298 self
299 }
300
301 pub(crate) fn build(self) -> ToolContext<'run> {
302 ToolContext {
303 session_id: self.session_id,
304 agent_frame_id: self.agent_frame_id,
305 sessions: self.sessions,
306 session_lifecycle: self.session_lifecycle,
307 processes: self.processes,
308 process_cancel_ability: self.process_cancel_ability,
309 effect_controller: self.effect_controller,
310 runtime_dispatch: self.runtime_dispatch,
311 runtime_execution_context: self.runtime_execution_context,
312 cancellation_token: self.cancellation_token,
313 async_process_id: self.async_process_id,
314 runtime_process_id: self.runtime_process_id,
315 process_events: self.process_events,
316 attachment_store: self.attachment_store,
317 direct_completions: self.direct_completions,
318 prepared_payload: self.prepared_payload,
319 tool_call_id: self.tool_call_id,
320 attempt_number: 1,
321 max_attempts: 1,
322 replay_key: None,
323 completion: self.completion,
324 durable_effects: self.durable_effects,
325 parent_invocation: self.parent_invocation,
326 execution_env_spec: self.execution_env_spec,
327 child_execution_trace_hook: self.child_execution_trace_hook,
328 }
329 }
330}
331
332impl<'run> ToolContext<'run> {
333 #[cfg(any(test, feature = "testing"))]
334 #[expect(
335 clippy::too_many_arguments,
336 reason = "testing constructor mirrors the sealed runtime tool context dependencies"
337 )]
338 pub(crate) fn builder(
339 session_id: String,
340 sessions: Arc<dyn SessionStateService>,
341 session_lifecycle: Arc<dyn SessionLifecycleService>,
342 session_graph: Arc<dyn SessionGraphService>,
343 processes: Arc<dyn crate::ProcessService>,
344 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
345 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
346 attachment_store: Arc<dyn AttachmentStore>,
347 direct_completions: crate::DirectCompletionClient<'run>,
348 ) -> ToolContextBuilder<'run> {
349 ToolContextBuilder {
350 session_id,
351 agent_frame_id: String::new(),
352 sessions,
353 session_lifecycle,
354 session_graph,
355 processes,
356 process_cancel_ability,
357 effect_controller,
358 runtime_dispatch: None,
359 runtime_execution_context: None,
360 cancellation_token: None,
361 async_process_id: None,
362 runtime_process_id: None,
363 process_events: None,
364 attachment_store,
365 direct_completions,
366 prepared_payload: serde_json::Value::Null,
367 tool_call_id: None,
368 completion: ToolCompletionState::default(),
369 durable_effects: ToolDurableEffectState::default(),
370 parent_invocation: None,
371 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
372 crate::PluginOptions::default(),
373 crate::SessionPolicy::default(),
374 ),
375 child_execution_trace_hook: None,
376 }
377 }
378
379 pub(crate) fn from_dispatch(
380 dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
381 ) -> ToolContextBuilder<'run> {
382 ToolContextBuilder::from_dispatch(dispatch)
383 }
384
385 pub fn session_id(&self) -> &str {
386 &self.session_id
387 }
388
389 pub fn agent_frame_id(&self) -> &str {
390 &self.agent_frame_id
391 }
392
393 pub fn sessions(&self) -> ToolSessionAdmin<'run> {
394 ToolSessionAdmin {
395 session_id: self.session_id.clone(),
396 sessions: Arc::clone(&self.sessions),
397 session_lifecycle: Arc::clone(&self.session_lifecycle),
398 effect_controller: self.effect_controller.clone(),
399 }
400 }
401
402 pub fn dispatch(&self) -> ToolDispatchClient<'run> {
403 ToolDispatchClient {
404 context: self.clone(),
405 }
406 }
407
408 pub fn triggers(&self) -> ToolTriggerClient<'run> {
409 ToolTriggerClient {
410 context: self.clone(),
411 }
412 }
413
414 pub fn processes(&self) -> ToolSessionProcessAdmin<'run> {
415 ToolSessionProcessAdmin {
416 session_id: self.session_id.clone(),
417 agent_frame_id: self.agent_frame_id.clone(),
418 processes: Arc::clone(&self.processes),
419 process_cancel_ability: Arc::clone(&self.process_cancel_ability),
420 effect_controller: self.effect_controller.clone(),
421 parent_invocation: self.parent_invocation.clone(),
422 tool_call_id: self.tool_call_id.clone(),
423 execution_env_spec: self.execution_env_spec.clone(),
424 }
425 }
426
427 pub fn emit_child_process_started(
428 &self,
429 process_id: impl Into<String>,
430 child_entry_name: Option<String>,
431 ) {
432 let Some(hook) = &self.child_execution_trace_hook else {
433 return;
434 };
435 hook.child_process_started(ToolChildProcessStarted {
436 process_id: process_id.into(),
437 child_entry_name,
438 });
439 }
440
441 pub fn direct_completions(&self) -> ToolDirectCompletionClient<'run> {
442 ToolDirectCompletionClient {
443 session_id: self.session_id.clone(),
444 tool_call_id: self.tool_call_id.clone(),
445 direct_completions: self.direct_completions.clone(),
446 parent_invocation: self.parent_invocation.clone(),
447 parent_tool_attempt_is_durable: self.parent_invocation.as_ref().is_some_and(
448 |invocation| {
449 invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolAttempt)
450 },
451 ) && self
452 .effect_controller
453 .controller()
454 .durability_tier()
455 == crate::DurabilityTier::Durable,
456 }
457 }
458
459 pub fn attachments(&self) -> ToolAttachmentClient {
460 ToolAttachmentClient {
461 store: Arc::clone(&self.attachment_store),
462 }
463 }
464
465 pub fn process_events(&self) -> ToolProcessEventClient {
466 ToolProcessEventClient {
467 context: self.process_events.clone(),
468 }
469 }
470
471 pub fn durable_effects(&self) -> Result<ToolDurableEffects<'_, 'run>, crate::RuntimeError> {
478 let Some(tool_call_id) = self.tool_call_id.as_deref() else {
479 return Err(crate::RuntimeError::new(
480 "durable_effects_missing_call_id",
481 "durable effects require a prepared tool call id",
482 ));
483 };
484 if tool_call_id.trim().is_empty() {
485 return Err(crate::RuntimeError::new(
486 "durable_effects_missing_call_id",
487 "durable effects require a non-empty prepared tool call id",
488 ));
489 }
490 let scoped = self.effect_controller.scoped();
491 if !scoped.controller().supports_durable_effects() {
492 return Err(crate::RuntimeError::new(
493 "durable_effects_unavailable",
494 "this effect controller does not support durable tool effects",
495 ));
496 }
497 if self.parent_invocation.as_ref().is_some_and(|invocation| {
498 invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolAttempt)
499 }) && scoped.controller().durability_tier() == crate::DurabilityTier::Durable
500 {
501 return Err(crate::RuntimeError::new(
502 "durable_effects_unavailable_in_tool_attempt",
503 "durable tool sub-effects are not available inside a journaled tool attempt",
504 ));
505 }
506 Ok(ToolDurableEffects { context: self })
507 }
508
509 pub fn cancellation_token(&self) -> Option<&tokio_util::sync::CancellationToken> {
510 self.cancellation_token.as_ref()
511 }
512
513 pub fn async_process_id(&self) -> Option<&str> {
514 self.async_process_id.as_deref()
515 }
516
517 pub fn runtime_process_id(&self) -> Option<&str> {
518 self.async_process_id
519 .as_deref()
520 .or(self.runtime_process_id.as_deref())
521 .or_else(|| {
522 self.process_events
523 .as_ref()
524 .map(|context| context.process_id.as_str())
525 })
526 }
527
528 pub fn tool_call_id(&self) -> Option<&str> {
529 self.tool_call_id.as_deref()
530 }
531
532 pub fn prepared_payload(&self) -> &serde_json::Value {
533 &self.prepared_payload
534 }
535
536 pub fn decode_prepared_payload<T>(&self) -> Result<T, serde_json::Error>
537 where
538 T: serde::de::DeserializeOwned,
539 {
540 serde_json::from_value(self.prepared_payload.clone())
541 }
542
543 pub fn attempt_number(&self) -> u32 {
544 self.attempt_number
545 }
546
547 pub fn max_attempts(&self) -> u32 {
548 self.max_attempts
549 }
550
551 pub fn replay_key(&self) -> Option<&str> {
552 self.replay_key.as_deref()
553 }
554
555 pub async fn completion_key(&self) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
569 let tool_call_id = self.tool_call_id.clone().ok_or_else(|| {
570 crate::RuntimeError::new(
571 "tool_completion_key_missing_call_id",
572 "completion keys require a prepared tool call id",
573 )
574 })?;
575 let scoped = self.effect_controller.scoped();
576 let key = scoped
577 .controller()
578 .await_event_key(
579 scoped.execution_scope(),
580 crate::AwaitEventWaitIdentity::tool_completion(tool_call_id),
581 )
582 .await?;
583 self.completion.store(key)
584 }
585
586 pub(crate) fn take_completion_key(
587 &self,
588 ) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
589 self.completion.take()
590 }
591
592 pub fn with_async_process(
593 mut self,
594 process_id: impl Into<String>,
595 cancellation_token: tokio_util::sync::CancellationToken,
596 ) -> Self {
597 self.async_process_id = Some(process_id.into());
598 self.runtime_process_id = self.async_process_id.clone();
599 self.cancellation_token = Some(cancellation_token);
600 self
601 }
602
603 #[cfg(any(test, feature = "testing"))]
604 #[doc(hidden)]
605 pub fn with_process_events_for_testing(
606 mut self,
607 process_id: impl Into<String>,
608 registry: Arc<dyn crate::ProcessRegistry>,
609 ) -> Self {
610 self.process_events = Some(ToolProcessEventContext {
611 process_id: process_id.into(),
612 registry,
613 store: None,
614 session_store_factory: None,
615 session_graph: Arc::new(crate::plugin::NoopSessionManager),
616 queued_work_driver: None,
617 });
618 self
619 }
620
621 pub(crate) fn with_retry_context(
622 mut self,
623 tool_name: &str,
624 attempt_number: u32,
625 max_attempts: u32,
626 ) -> Self {
627 self.attempt_number = attempt_number.max(1);
628 self.max_attempts = max_attempts.max(1);
629 self.replay_key = self
630 .tool_call_id
631 .as_ref()
632 .map(|call_id| format!("lash-tool:{}:{call_id}:{tool_name}", self.session_id));
633 self
634 }
635
636 pub(crate) fn with_prepared_payload(mut self, payload: serde_json::Value) -> Self {
637 self.prepared_payload = payload;
638 self
639 }
640
641 #[cfg(any(test, feature = "testing"))]
644 #[doc(hidden)]
645 #[expect(
646 clippy::too_many_arguments,
647 reason = "test-only constructor mirrors the sealed runtime tool context"
648 )]
649 pub fn __for_testing(
650 session_id: String,
651 sessions: Arc<dyn SessionStateService>,
652 session_lifecycle: Arc<dyn SessionLifecycleService>,
653 session_graph: Arc<dyn SessionGraphService>,
654 processes: Arc<dyn crate::ProcessService>,
655 attachment_store: Arc<dyn AttachmentStore>,
656 direct_completions: crate::DirectCompletionClient<'static>,
657 tool_call_id: Option<String>,
658 ) -> ToolContext<'static> {
659 ToolContext::builder(
660 session_id,
661 sessions,
662 session_lifecycle,
663 session_graph,
664 processes,
665 Arc::new(crate::DefaultProcessCancelAbility),
666 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
667 crate::InlineRuntimeEffectController,
668 )),
669 attachment_store,
670 direct_completions,
671 )
672 .tool_call_id(tool_call_id)
673 .build()
674 }
675
676 #[cfg(any(test, feature = "testing"))]
680 #[doc(hidden)]
681 #[expect(
682 clippy::too_many_arguments,
683 reason = "test-only constructor mirrors the sealed runtime context"
684 )]
685 pub fn __for_testing_with_process_cancel_ability(
686 session_id: String,
687 sessions: Arc<dyn SessionStateService>,
688 session_lifecycle: Arc<dyn SessionLifecycleService>,
689 session_graph: Arc<dyn SessionGraphService>,
690 processes: Arc<dyn crate::ProcessService>,
691 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
692 attachment_store: Arc<dyn AttachmentStore>,
693 direct_completions: crate::DirectCompletionClient<'static>,
694 tool_call_id: Option<String>,
695 ) -> ToolContext<'static> {
696 ToolContext::builder(
697 session_id,
698 sessions,
699 session_lifecycle,
700 session_graph,
701 processes,
702 process_cancel_ability,
703 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
704 crate::InlineRuntimeEffectController,
705 )),
706 attachment_store,
707 direct_completions,
708 )
709 .tool_call_id(tool_call_id)
710 .build()
711 }
712}
713
714pub struct ToolDurableEffects<'ctx, 'run> {
720 context: &'ctx ToolContext<'run>,
721}
722
723impl<'ctx, 'run> ToolDurableEffects<'ctx, 'run> {
724 pub async fn run_json<F, Fut>(
725 &self,
726 step_id: impl Into<String>,
727 input: serde_json::Value,
728 run: F,
729 ) -> Result<serde_json::Value, crate::RuntimeError>
730 where
731 F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
732 Fut: Future<Output = Result<serde_json::Value, crate::RuntimeError>> + Send + 'run,
733 {
734 let step_id = step_id.into();
735 if step_id.trim().is_empty() {
736 return Err(crate::RuntimeError::new(
737 "durable_effect_empty_step_id",
738 "durable effect step id must be non-empty",
739 ));
740 }
741 self.context.durable_effects.reserve_step(&step_id)?;
742 let invocation = self.step_invocation(
743 format!("durable-step:{step_id}"),
744 crate::RuntimeEffectKind::DurableStep,
745 format!("durable-step:{step_id}"),
746 )?;
747 let outcome = self
748 .context
749 .effect_controller
750 .controller()
751 .execute_effect(
752 crate::RuntimeEffectEnvelope::new(
753 invocation,
754 crate::RuntimeEffectCommand::DurableStep {
755 step_id,
756 input: input.clone(),
757 },
758 ),
759 crate::RuntimeEffectLocalExecutor::durable_step(run),
760 )
761 .await
762 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
763 outcome
764 .into_durable_step()
765 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)
766 }
767
768 pub async fn external_event_key(
769 &self,
770 key: impl Into<String>,
771 ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
772 let key = key.into();
773 if key.trim().is_empty() {
774 return Err(crate::RuntimeError::new(
775 "durable_effect_empty_event_key",
776 "durable effect external event key must be non-empty",
777 ));
778 }
779 let scoped = self.context.effect_controller.scoped();
780 scoped
781 .controller()
782 .await_event_key(
783 scoped.execution_scope(),
784 crate::AwaitEventWaitIdentity::Custom { key },
785 )
786 .await
787 }
788
789 pub async fn await_event_json(
790 &self,
791 key: crate::AwaitEventKey,
792 ) -> Result<serde_json::Value, crate::RuntimeError> {
793 let invocation = self.step_invocation(
794 format!("await-event:{}", key.key_id),
795 crate::RuntimeEffectKind::AwaitEvent,
796 format!("await-event:{}", key.key_id),
797 )?;
798 let cancellation = self.context.cancellation_token.clone().unwrap_or_default();
799 let clock = self
800 .context
801 .runtime_dispatch
802 .as_ref()
803 .map(|dispatch| Arc::clone(&dispatch.clock))
804 .unwrap_or_else(|| Arc::new(crate::SystemClock));
805 let outcome = self
806 .context
807 .effect_controller
808 .controller()
809 .execute_effect(
810 crate::RuntimeEffectEnvelope::new(
811 invocation,
812 crate::RuntimeEffectCommand::AwaitEvent { key },
813 ),
814 crate::RuntimeEffectLocalExecutor::await_event_with_clock(
815 cancellation,
816 None,
817 clock,
818 ),
819 )
820 .await
821 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
822 match outcome
823 .into_await_event()
824 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?
825 {
826 crate::Resolution::Ok(value) => Ok(value),
827 crate::Resolution::Err(err) => Err(crate::RuntimeError::new(err.code, err.message)),
828 crate::Resolution::Timeout => Err(crate::RuntimeError::new(
829 "durable_effect_event_timeout",
830 "durable effect external event wait timed out",
831 )),
832 crate::Resolution::Cancelled => Err(crate::RuntimeError::new(
833 "durable_effect_event_cancelled",
834 "durable effect external event wait was cancelled",
835 )),
836 }
837 }
838
839 pub async fn emit_process_event(
840 &self,
841 event_type: impl Into<String>,
842 payload: serde_json::Value,
843 ) -> Result<crate::ProcessEvent, crate::RuntimeError> {
844 let Some(process) = self.context.process_events.as_ref() else {
845 return Err(crate::RuntimeError::new(
846 "durable_effect_process_event_unavailable",
847 "durable effect process events are unavailable outside a durable process",
848 ));
849 };
850 let event_type = event_type.into();
851 if event_type.trim().is_empty() {
852 return Err(crate::RuntimeError::new(
853 "durable_effect_empty_process_event_type",
854 "durable effect process event type must be non-empty",
855 ));
856 }
857 let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
858 crate::RuntimeError::new(
859 "durable_effects_missing_call_id",
860 "durable effects require a prepared tool call id",
861 )
862 })?;
863 let sequence = self.context.durable_effects.next_process_event_sequence();
864 let request = crate::ProcessEventAppendRequest::new(event_type, payload).with_replay_key(
865 format!("tool:{tool_call_id}:durable-process-event:{sequence}"),
866 );
867 self.context
868 .process_events()
869 .emit_request(request)
870 .await
871 .map_err(|err| {
872 crate::RuntimeError::new(
873 "durable_effect_process_event_append_failed",
874 err.to_string(),
875 )
876 })
877 .and_then(|event| {
878 if event.process_id == process.process_id {
879 Ok(event)
880 } else {
881 Err(crate::RuntimeError::new(
882 "durable_effect_process_event_process_mismatch",
883 "process event append returned an event for a different process",
884 ))
885 }
886 })
887 }
888
889 fn step_invocation(
890 &self,
891 effect_id_suffix: impl Into<String>,
892 kind: crate::RuntimeEffectKind,
893 replay_suffix: impl AsRef<str>,
894 ) -> Result<crate::RuntimeInvocation, crate::RuntimeError> {
895 let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
896 crate::RuntimeError::new(
897 "durable_effects_missing_call_id",
898 "durable effects require a prepared tool call id",
899 )
900 })?;
901 let effect_id_suffix = effect_id_suffix.into();
902 if let Some(parent) = self.context.parent_invocation.as_ref() {
903 return Ok(crate::runtime::causal::child_effect_invocation(
904 parent,
905 format!("{tool_call_id}:{effect_id_suffix}"),
906 kind,
907 replay_suffix,
908 ));
909 }
910 let scoped = self.context.effect_controller.scoped();
911 let replay_key = format!(
912 "{}:tool:{tool_call_id}:{}",
913 scoped.scope_id(),
914 replay_suffix.as_ref()
915 );
916 Ok(crate::RuntimeInvocation::effect(
917 crate::RuntimeScope::new(self.context.session_id.clone()),
918 format!("{tool_call_id}:{effect_id_suffix}"),
919 kind,
920 replay_key,
921 ))
922 }
923}
924
925#[derive(Clone, Debug, Serialize, Deserialize)]
931pub struct PreparedToolCall {
932 pub call_id: String,
933 pub tool_id: ToolId,
934 pub tool_name: String,
935 pub args: serde_json::Value,
936 #[serde(default, skip_serializing_if = "Option::is_none")]
937 pub replay: Option<ProviderReplayMeta>,
938 #[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
939 pub prepared_payload: serde_json::Value,
940}
941
942impl PreparedToolCall {
943 pub fn identity(tool_id: ToolId, call: crate::sansio::PendingToolCall) -> Self {
944 Self {
945 call_id: call.call_id,
946 tool_id,
947 tool_name: call.tool_name,
948 args: call.args,
949 replay: call.replay,
950 prepared_payload: serde_json::Value::Null,
951 }
952 }
953
954 pub fn from_parts(
955 call_id: impl Into<String>,
956 tool_id: impl Into<ToolId>,
957 tool_name: impl Into<String>,
958 args: serde_json::Value,
959 replay: Option<ProviderReplayMeta>,
960 prepared_payload: serde_json::Value,
961 ) -> Self {
962 Self {
963 call_id: call_id.into(),
964 tool_id: tool_id.into(),
965 tool_name: tool_name.into(),
966 args,
967 replay,
968 prepared_payload,
969 }
970 }
971}
972
973#[derive(Clone, Debug, Serialize, Deserialize)]
979pub struct PreparedToolBatchCall {
980 pub call: PreparedToolCall,
981 pub replay_suffix: String,
982}
983
984#[derive(Clone, Debug, Serialize, Deserialize)]
990pub struct PreparedToolBatch {
991 pub batch_id: String,
992 pub calls: Vec<PreparedToolBatchCall>,
993}
994
995impl PreparedToolBatch {
996 pub fn new(batch_id: impl Into<String>, calls: Vec<PreparedToolCall>) -> Self {
997 let batch_id = batch_id.into();
998 let calls = calls
999 .into_iter()
1000 .enumerate()
1001 .map(|(index, call)| PreparedToolBatchCall {
1002 replay_suffix: format!("child:{index}:{}", call.call_id),
1003 call,
1004 })
1005 .collect();
1006 Self { batch_id, calls }
1007 }
1008
1009 pub fn is_empty(&self) -> bool {
1010 self.calls.is_empty()
1011 }
1012
1013 pub fn len(&self) -> usize {
1014 self.calls.len()
1015 }
1016}
1017
1018#[derive(Clone)]
1019pub struct ToolPrepareContext {
1020 session_id: String,
1021 sessions: Arc<dyn SessionStateService>,
1022 turn_context: crate::TurnContext,
1023 tool_call_id: Option<String>,
1024}
1025
1026impl ToolPrepareContext {
1027 pub(crate) fn new(
1028 session_id: String,
1029 sessions: Arc<dyn SessionStateService>,
1030 turn_context: crate::TurnContext,
1031 tool_call_id: Option<String>,
1032 ) -> Self {
1033 Self {
1034 session_id,
1035 sessions,
1036 turn_context,
1037 tool_call_id,
1038 }
1039 }
1040
1041 pub fn session_id(&self) -> &str {
1042 &self.session_id
1043 }
1044
1045 pub fn tool_call_id(&self) -> Option<&str> {
1046 self.tool_call_id.as_deref()
1047 }
1048
1049 pub fn turn_context(&self) -> &crate::TurnContext {
1050 &self.turn_context
1051 }
1052
1053 pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
1054 where
1055 T: 'static,
1056 {
1057 self.turn_context.plugin_input::<T>(plugin_id)
1058 }
1059
1060 pub async fn session_snapshot(&self) -> Result<SessionSnapshot, PluginError> {
1061 self.sessions.snapshot_session(&self.session_id).await
1062 }
1063
1064 pub async fn tool_catalog(&self) -> Result<Vec<serde_json::Value>, PluginError> {
1065 self.sessions.tool_catalog(&self.session_id).await
1066 }
1067
1068 pub async fn shared_tool_catalog(
1069 &self,
1070 ) -> Result<std::sync::Arc<Vec<serde_json::Value>>, PluginError> {
1071 self.sessions.shared_tool_catalog(&self.session_id).await
1072 }
1073}
1074
1075pub struct ToolPrepareCall<'a> {
1077 pub tool_id: ToolId,
1078 pub pending: crate::sansio::PendingToolCall,
1079 pub context: &'a ToolPrepareContext,
1080}
1081
1082pub struct ToolCall<'a> {
1089 pub name: &'a str,
1090 pub args: &'a serde_json::Value,
1091 pub context: &'a ToolContext<'a>,
1092 pub progress: Option<&'a ProgressSender>,
1093}
1094
1095#[async_trait::async_trait]
1103pub trait ToolProvider: Send + Sync + 'static {
1104 fn tool_manifests(&self) -> Vec<ToolManifest>;
1105 fn resolve_manifest(&self, name: &str) -> Option<ToolManifest> {
1106 self.tool_manifests()
1107 .into_iter()
1108 .find(|manifest| manifest.name == name)
1109 }
1110 fn resolve_manifest_by_id(&self, id: &ToolId) -> Option<ToolManifest> {
1111 self.tool_manifests()
1112 .into_iter()
1113 .find(|manifest| manifest.id == *id)
1114 }
1115 fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>>;
1116 fn resolve_contract_by_id(&self, id: &ToolId) -> Option<Arc<ToolContract>> {
1117 let manifest = self.resolve_manifest_by_id(id)?;
1118 self.resolve_contract(&manifest.name)
1119 }
1120 async fn prepare_tool_call(
1121 &self,
1122 call: ToolPrepareCall<'_>,
1123 ) -> Result<PreparedToolCall, ToolResult> {
1124 Ok(PreparedToolCall::identity(call.tool_id, call.pending))
1125 }
1126 async fn execute(&self, call: ToolCall<'_>) -> ToolResult;
1127 async fn execute_by_id(
1128 &self,
1129 tool_id: &ToolId,
1130 args: &serde_json::Value,
1131 context: &ToolContext<'_>,
1132 progress: Option<&ProgressSender>,
1133 ) -> ToolResult {
1134 let Some(manifest) = self.resolve_manifest_by_id(tool_id) else {
1135 return ToolResult::err_fmt(format!("Unknown tool id: {tool_id}"));
1136 };
1137 self.execute(ToolCall {
1138 name: &manifest.name,
1139 args,
1140 context,
1141 progress,
1142 })
1143 .await
1144 }
1145}
1146
1147#[cfg(test)]
1148mod tests {
1149 use super::*;
1150 use crate::ProcessRegistry;
1151 use crate::RuntimeEffectController;
1152 use std::sync::atomic::{AtomicU64, Ordering};
1153
1154 struct NoDurableEffectController;
1155
1156 #[async_trait::async_trait]
1157 impl crate::RuntimeEffectController for NoDurableEffectController {
1158 async fn execute_effect(
1159 &self,
1160 _envelope: crate::RuntimeEffectEnvelope,
1161 _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
1162 ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
1163 Err(crate::RuntimeEffectControllerError::new(
1164 "unexpected_effect",
1165 "test controller should not execute effects",
1166 ))
1167 }
1168 }
1169
1170 fn test_context_with_controller(
1171 tool_call_id: Option<String>,
1172 controller: Arc<dyn crate::RuntimeEffectController>,
1173 ) -> ToolContext<'static> {
1174 ToolContext::builder(
1175 "session-1".to_string(),
1176 Arc::new(crate::testing::MockSessionManager::default()),
1177 Arc::new(crate::testing::MockSessionManager::default()),
1178 Arc::new(crate::testing::MockSessionManager::default()),
1179 Arc::new(crate::UnavailableProcessService),
1180 Arc::new(crate::DefaultProcessCancelAbility),
1181 crate::runtime::RuntimeEffectControllerHandle::shared(controller),
1182 Arc::new(crate::InMemoryAttachmentStore::new()),
1183 crate::DirectCompletionClient::unavailable(
1184 "direct completions are unavailable in this test context",
1185 ),
1186 )
1187 .tool_call_id(tool_call_id)
1188 .build()
1189 }
1190
1191 #[test]
1192 fn tool_context_builder_carries_call_payload_and_cancellation_state() {
1193 let cancellation = tokio_util::sync::CancellationToken::new();
1194 let prepared = PreparedToolCall::from_parts(
1195 "call-1",
1196 "tool:demo_tool",
1197 "demo_tool",
1198 serde_json::json!({ "input": true }),
1199 None,
1200 serde_json::json!({ "prepared": true }),
1201 );
1202
1203 let context = ToolContext::builder(
1204 "session-1".to_string(),
1205 Arc::new(crate::testing::MockSessionManager::default()),
1206 Arc::new(crate::testing::MockSessionManager::default()),
1207 Arc::new(crate::testing::MockSessionManager::default()),
1208 Arc::new(crate::UnavailableProcessService),
1209 Arc::new(crate::DefaultProcessCancelAbility),
1210 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1211 crate::InlineRuntimeEffectController,
1212 )),
1213 Arc::new(crate::InMemoryAttachmentStore::new()),
1214 crate::DirectCompletionClient::unavailable(
1215 "direct completions are unavailable in this test context",
1216 ),
1217 )
1218 .prepared_call(&prepared)
1219 .cancellation_token(Some(cancellation.clone()))
1220 .async_process("process-1", cancellation.clone())
1221 .build();
1222
1223 assert_eq!(context.session_id(), "session-1");
1224 assert_eq!(context.tool_call_id(), Some("call-1"));
1225 assert_eq!(
1226 context.prepared_payload(),
1227 &serde_json::json!({ "prepared": true })
1228 );
1229 assert_eq!(context.async_process_id(), Some("process-1"));
1230 assert!(context.cancellation_token().is_some());
1231 }
1232
1233 #[test]
1234 fn durable_effects_requires_prepared_call_id_and_supporting_controller() {
1235 let missing_call =
1236 test_context_with_controller(None, Arc::new(crate::InlineRuntimeEffectController));
1237 let err = match missing_call.durable_effects() {
1238 Ok(_) => panic!("missing prepared tool call id should fail"),
1239 Err(err) => err,
1240 };
1241 assert_eq!(err.code.as_str(), "durable_effects_missing_call_id");
1242
1243 let unsupported = test_context_with_controller(
1244 Some("call-1".to_string()),
1245 Arc::new(NoDurableEffectController),
1246 );
1247 let err = match unsupported.durable_effects() {
1248 Ok(_) => panic!("unsupported controller should fail"),
1249 Err(err) => err,
1250 };
1251 assert_eq!(err.code.as_str(), "durable_effects_unavailable");
1252 }
1253
1254 #[tokio::test]
1255 async fn durable_run_json_executes_and_maps_closure_errors() {
1256 let context = test_context_with_controller(
1257 Some("call-run-json".to_string()),
1258 Arc::new(crate::InlineRuntimeEffectController),
1259 );
1260 let durable = context.durable_effects().expect("durable effects");
1261 let value = durable
1262 .run_json(
1263 "create",
1264 serde_json::json!({ "x": 1 }),
1265 |input| async move { Ok(serde_json::json!({ "seen": input["x"] })) },
1266 )
1267 .await
1268 .expect("durable step");
1269 assert_eq!(value, serde_json::json!({ "seen": 1 }));
1270
1271 let err = durable
1272 .run_json("fail", serde_json::json!({}), |_| async {
1273 Err(crate::RuntimeError::new(
1274 "durable_step_failed",
1275 "step failed",
1276 ))
1277 })
1278 .await
1279 .expect_err("closure error");
1280 assert_eq!(err.code.as_str(), "durable_step_failed");
1281 assert_eq!(err.message, "step failed");
1282 }
1283
1284 #[tokio::test]
1285 async fn durable_run_json_rejects_empty_or_duplicate_step_ids_before_running() {
1286 let context = test_context_with_controller(
1287 Some("call-step-ids".to_string()),
1288 Arc::new(crate::InlineRuntimeEffectController),
1289 );
1290 let durable = context.durable_effects().expect("durable effects");
1291 let runs = Arc::new(AtomicU64::new(0));
1292
1293 let err = durable
1294 .run_json("", serde_json::Value::Null, {
1295 let runs = Arc::clone(&runs);
1296 move |_| async move {
1297 runs.fetch_add(1, Ordering::Relaxed);
1298 Ok(serde_json::Value::Null)
1299 }
1300 })
1301 .await
1302 .expect_err("empty step id");
1303 assert_eq!(err.code.as_str(), "durable_effect_empty_step_id");
1304 assert_eq!(runs.load(Ordering::Relaxed), 0);
1305
1306 durable
1307 .run_json("same", serde_json::Value::Null, {
1308 let runs = Arc::clone(&runs);
1309 move |_| async move {
1310 runs.fetch_add(1, Ordering::Relaxed);
1311 Ok(serde_json::Value::Null)
1312 }
1313 })
1314 .await
1315 .expect("first step");
1316 let err = durable
1317 .run_json("same", serde_json::Value::Null, {
1318 let runs = Arc::clone(&runs);
1319 move |_| async move {
1320 runs.fetch_add(1, Ordering::Relaxed);
1321 Ok(serde_json::Value::Null)
1322 }
1323 })
1324 .await
1325 .expect_err("duplicate step id");
1326 assert_eq!(err.code.as_str(), "durable_effect_duplicate_step_id");
1327 assert_eq!(runs.load(Ordering::Relaxed), 1);
1328 }
1329
1330 #[tokio::test]
1331 async fn durable_external_event_key_is_custom_and_stable() {
1332 let context = test_context_with_controller(
1333 Some("call-event-key".to_string()),
1334 Arc::new(crate::InlineRuntimeEffectController),
1335 );
1336 let durable = context.durable_effects().expect("durable effects");
1337 let first = durable
1338 .external_event_key("tool-event-stable")
1339 .await
1340 .expect("first key");
1341 let second = durable
1342 .external_event_key("tool-event-stable")
1343 .await
1344 .expect("second key");
1345
1346 assert_eq!(first, second);
1347 assert_eq!(
1348 first.wait,
1349 crate::AwaitEventWaitIdentity::Custom {
1350 key: "tool-event-stable".to_string()
1351 }
1352 );
1353 }
1354
1355 #[tokio::test]
1356 async fn durable_await_event_json_maps_terminal_resolutions() {
1357 let controller = Arc::new(crate::InlineRuntimeEffectController);
1358 let context =
1359 test_context_with_controller(Some("call-await-event".to_string()), controller.clone());
1360 let durable = context.durable_effects().expect("durable effects");
1361
1362 let ok_key = durable
1363 .external_event_key("tool-event-ok")
1364 .await
1365 .expect("ok key");
1366 controller
1367 .resolve_await_event(
1368 &ok_key,
1369 crate::Resolution::Ok(serde_json::json!({ "answer": 42 })),
1370 )
1371 .await
1372 .expect("resolve ok");
1373 let value = durable
1374 .await_event_json(ok_key)
1375 .await
1376 .expect("await ok value");
1377 assert_eq!(value, serde_json::json!({ "answer": 42 }));
1378
1379 let err_key = durable
1380 .external_event_key("tool-event-err")
1381 .await
1382 .expect("err key");
1383 controller
1384 .resolve_await_event(
1385 &err_key,
1386 crate::Resolution::Err(crate::ExternalCompletionError::new(
1387 "external_bad",
1388 "external failed",
1389 )),
1390 )
1391 .await
1392 .expect("resolve err");
1393 let err = durable
1394 .await_event_json(err_key)
1395 .await
1396 .expect_err("await err value");
1397 assert_eq!(err.code.as_str(), "external_bad");
1398
1399 let cancelled_key = durable
1400 .external_event_key("tool-event-cancelled")
1401 .await
1402 .expect("cancelled key");
1403 controller
1404 .resolve_await_event(&cancelled_key, crate::Resolution::Cancelled)
1405 .await
1406 .expect("resolve cancelled");
1407 let err = durable
1408 .await_event_json(cancelled_key)
1409 .await
1410 .expect_err("await cancelled value");
1411 assert_eq!(err.code.as_str(), "durable_effect_event_cancelled");
1412
1413 let timeout_key = durable
1414 .external_event_key("tool-event-timeout")
1415 .await
1416 .expect("timeout key");
1417 controller
1418 .resolve_await_event(&timeout_key, crate::Resolution::Timeout)
1419 .await
1420 .expect("resolve timeout");
1421 let err = durable
1422 .await_event_json(timeout_key)
1423 .await
1424 .expect_err("await timeout value");
1425 assert_eq!(err.code.as_str(), "durable_effect_event_timeout");
1426 }
1427
1428 #[tokio::test]
1429 async fn durable_emit_process_event_requires_process_and_appends_inside_process() {
1430 let context = test_context_with_controller(
1431 Some("call-no-process".to_string()),
1432 Arc::new(crate::InlineRuntimeEffectController),
1433 );
1434 let err = context
1435 .durable_effects()
1436 .expect("durable effects")
1437 .emit_process_event("tool.event", serde_json::json!({}))
1438 .await
1439 .expect_err("outside process");
1440 assert_eq!(
1441 err.code.as_str(),
1442 "durable_effect_process_event_unavailable"
1443 );
1444
1445 let registry = Arc::new(crate::TestLocalProcessRegistry::default());
1446 let process_id = "process:durable-tool-event";
1447 registry
1448 .register_process(
1449 crate::ProcessRegistration::new(
1450 process_id,
1451 crate::ProcessInput::External {
1452 metadata: serde_json::json!({}),
1453 },
1454 crate::ProcessProvenance::host(),
1455 )
1456 .with_extra_event_types([crate::ProcessEventType {
1457 name: "tool.event".to_string(),
1458 payload_schema: crate::LashSchema::any(),
1459 semantics: crate::ProcessEventSemanticsSpec::default(),
1460 }]),
1461 )
1462 .await
1463 .expect("register process");
1464 let registry_dyn: Arc<dyn crate::ProcessRegistry> = registry;
1465 let context = test_context_with_controller(
1466 Some("call-process-event".to_string()),
1467 Arc::new(crate::InlineRuntimeEffectController),
1468 )
1469 .with_process_events_for_testing(process_id, registry_dyn);
1470
1471 let event = context
1472 .durable_effects()
1473 .expect("durable effects")
1474 .emit_process_event("tool.event", serde_json::json!({ "ok": true }))
1475 .await
1476 .expect("process event");
1477 assert_eq!(event.process_id, process_id);
1478 assert_eq!(event.event_type, "tool.event");
1479 assert_eq!(event.payload, serde_json::json!({ "ok": true }));
1480 assert_eq!(
1481 event.invocation.replay_key(),
1482 Some("tool:call-process-event:durable-process-event:0")
1483 );
1484
1485 let append_err = context
1486 .durable_effects()
1487 .expect("durable effects")
1488 .emit_process_event("undeclared.event", serde_json::json!({}))
1489 .await
1490 .expect_err("undeclared event type must fail the append");
1491 assert_eq!(
1492 append_err.code.as_str(),
1493 "durable_effect_process_event_append_failed"
1494 );
1495 }
1496}