1use std::collections::HashSet;
2use std::future::Future;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use lash_sansio::llm::types::ProviderReplayMeta;
7use serde::{Deserialize, Serialize};
8
9use crate::plugin::{
10 PluginError, SessionGraphService, SessionLifecycleService, SessionSnapshot, SessionStateService,
11};
12use crate::{AttachmentStore, ToolContract, ToolId, ToolManifest, ToolResult};
13
14mod attachments;
15mod direct_completion;
16mod dispatch;
17mod process;
18pub(crate) mod process_events;
19mod session;
20mod triggers;
21
22pub use attachments::ToolAttachmentClient;
23pub use direct_completion::ToolDirectCompletionClient;
24pub use dispatch::ToolDispatchClient;
25pub use process::ToolSessionProcessAdmin;
26pub use process_events::ToolProcessEventClient;
27pub use session::{ToolSessionAdmin, ToolSessionModel};
28pub use triggers::ToolTriggerClient;
29
30#[derive(Clone, Debug)]
32pub struct SandboxMessage {
33 pub text: String,
34 pub kind: String,
36}
37
38pub type ProgressSender = tokio::sync::mpsc::UnboundedSender<SandboxMessage>;
40
41#[derive(Clone, Default)]
42pub(crate) struct ToolCompletionState {
43 key: Arc<Mutex<Option<crate::AwaitEventKey>>>,
44}
45
46impl ToolCompletionState {
47 fn store(
48 &self,
49 key: crate::AwaitEventKey,
50 ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
51 let mut guard = self.key.lock().map_err(|_| {
52 crate::RuntimeError::new(
53 "tool_completion_state_poisoned",
54 "tool completion key state lock poisoned",
55 )
56 })?;
57 if let Some(existing) = guard.as_ref() {
58 return Ok(existing.clone());
59 }
60 *guard = Some(key.clone());
61 Ok(key)
62 }
63
64 pub(crate) fn take(&self) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
65 self.key.lock().map(|mut guard| guard.take()).map_err(|_| {
66 crate::RuntimeError::new(
67 "tool_completion_state_poisoned",
68 "tool completion key state lock poisoned",
69 )
70 })
71 }
72}
73
74#[derive(Clone, Default)]
75pub(crate) struct ToolDurableEffectState {
76 step_ids: Arc<Mutex<HashSet<String>>>,
77 process_event_sequence: Arc<AtomicU64>,
78}
79
80impl ToolDurableEffectState {
81 fn reserve_step(&self, step_id: &str) -> Result<(), crate::RuntimeError> {
82 let mut guard = self.step_ids.lock().map_err(|_| {
83 crate::RuntimeError::new(
84 "durable_effect_state_poisoned",
85 "durable effect step state lock poisoned",
86 )
87 })?;
88 if !guard.insert(step_id.to_string()) {
89 return Err(crate::RuntimeError::new(
90 "durable_effect_duplicate_step_id",
91 format!("durable effect step id `{step_id}` was already used by this tool call"),
92 ));
93 }
94 Ok(())
95 }
96
97 fn next_process_event_sequence(&self) -> u64 {
98 self.process_event_sequence.fetch_add(1, Ordering::Relaxed)
99 }
100}
101
102#[derive(Clone)]
105pub struct ToolContext<'run> {
106 pub(crate) session_id: String,
107 pub(crate) agent_frame_id: crate::AgentFrameId,
108 pub(crate) sessions: Arc<dyn SessionStateService>,
109 pub(crate) session_lifecycle: Arc<dyn SessionLifecycleService>,
110 pub(crate) processes: Arc<dyn crate::ProcessService>,
111 pub(crate) process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
112 pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
113 pub(crate) runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
114 pub(crate) runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
115 pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
116 pub(crate) async_process_id: Option<String>,
117 pub(crate) runtime_process_id: Option<String>,
118 pub(crate) process_events: Option<ToolProcessEventContext>,
119 pub(crate) attachment_store: Arc<dyn AttachmentStore>,
120 pub(crate) direct_completions: crate::DirectCompletionClient<'run>,
121 pub(crate) prepared_payload: serde_json::Value,
122 pub(crate) tool_call_id: Option<String>,
124 pub(crate) attempt_number: u32,
125 pub(crate) max_attempts: u32,
126 pub(crate) replay_key: Option<String>,
127 pub(crate) completion: ToolCompletionState,
128 pub(crate) durable_effects: ToolDurableEffectState,
129 pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
130 pub(crate) execution_env_spec: crate::ProcessExecutionEnvSpec,
131 pub(crate) child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
132}
133
134#[derive(Clone)]
135pub struct ToolChildProcessStarted {
136 pub process_id: String,
137 pub child_entry_name: Option<String>,
138}
139
140#[derive(Clone)]
141pub struct ToolChildExecutionTraceHook {
142 on_child_process_started: Arc<dyn Fn(ToolChildProcessStarted) + Send + Sync>,
143}
144
145impl ToolChildExecutionTraceHook {
146 pub fn new(
147 on_child_process_started: impl Fn(ToolChildProcessStarted) + Send + Sync + 'static,
148 ) -> Self {
149 Self {
150 on_child_process_started: Arc::new(on_child_process_started),
151 }
152 }
153
154 pub fn child_process_started(&self, event: ToolChildProcessStarted) {
155 (self.on_child_process_started)(event);
156 }
157}
158
159#[derive(Clone)]
160pub(crate) struct ToolProcessEventContext {
161 process_id: String,
162 registry: Arc<dyn crate::ProcessRegistry>,
163 store: Option<Arc<dyn crate::RuntimePersistence>>,
164 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
165 session_graph: Arc<dyn SessionGraphService>,
166 queued_work_driver: Option<crate::QueuedWorkDriver>,
167}
168
169pub(crate) struct ToolContextBuilder<'run> {
170 session_id: String,
171 agent_frame_id: crate::AgentFrameId,
172 sessions: Arc<dyn SessionStateService>,
173 session_lifecycle: Arc<dyn SessionLifecycleService>,
174 session_graph: Arc<dyn SessionGraphService>,
175 processes: Arc<dyn crate::ProcessService>,
176 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
177 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
178 runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
179 runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
180 cancellation_token: Option<tokio_util::sync::CancellationToken>,
181 async_process_id: Option<String>,
182 runtime_process_id: Option<String>,
183 process_events: Option<ToolProcessEventContext>,
184 attachment_store: Arc<dyn AttachmentStore>,
185 direct_completions: crate::DirectCompletionClient<'run>,
186 prepared_payload: serde_json::Value,
187 tool_call_id: Option<String>,
188 completion: ToolCompletionState,
189 durable_effects: ToolDurableEffectState,
190 parent_invocation: Option<crate::RuntimeInvocation>,
191 execution_env_spec: crate::ProcessExecutionEnvSpec,
192 child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
193}
194
195impl<'run> ToolContextBuilder<'run> {
196 pub(crate) fn from_dispatch(
197 dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
198 ) -> Self {
199 Self {
200 session_id: dispatch.session_id.clone(),
201 agent_frame_id: dispatch.agent_frame_id.clone(),
202 sessions: Arc::clone(&dispatch.sessions),
203 session_lifecycle: Arc::clone(&dispatch.session_lifecycle),
204 session_graph: Arc::clone(&dispatch.session_graph),
205 processes: Arc::clone(&dispatch.processes),
206 process_cancel_ability: Arc::clone(&dispatch.process_cancel_ability),
207 effect_controller: dispatch.effect_controller.clone(),
208 runtime_dispatch: Some(Arc::clone(&dispatch)),
209 runtime_execution_context: None,
210 cancellation_token: None,
211 async_process_id: None,
212 runtime_process_id: None,
213 process_events: None,
214 attachment_store: Arc::clone(&dispatch.attachment_store),
215 direct_completions: dispatch.direct_completions.clone(),
216 prepared_payload: serde_json::Value::Null,
217 tool_call_id: None,
218 completion: ToolCompletionState::default(),
219 durable_effects: ToolDurableEffectState::default(),
220 parent_invocation: dispatch.parent_invocation.clone(),
221 execution_env_spec: dispatch.execution_env_spec.clone(),
222 child_execution_trace_hook: None,
223 }
224 }
225
226 #[cfg(any(test, feature = "testing"))]
227 pub(crate) fn tool_call_id(mut self, tool_call_id: impl Into<Option<String>>) -> Self {
228 self.tool_call_id = tool_call_id.into();
229 self
230 }
231
232 pub(crate) fn prepared_call(mut self, call: &PreparedToolCall) -> Self {
233 self.tool_call_id = Some(call.call_id.clone());
234 self.prepared_payload = call.prepared_payload.clone();
235 self
236 }
237
238 pub(crate) fn cancellation_token(
239 mut self,
240 cancellation_token: Option<tokio_util::sync::CancellationToken>,
241 ) -> Self {
242 self.cancellation_token = cancellation_token;
243 self
244 }
245
246 pub(crate) fn runtime_execution_context(
247 mut self,
248 context: crate::RuntimeExecutionContext<'run>,
249 ) -> Self {
250 self.runtime_execution_context = Some(context);
251 self
252 }
253
254 pub(crate) fn runtime_process_id(mut self, process_id: Option<String>) -> Self {
255 self.runtime_process_id = process_id;
256 self
257 }
258
259 pub(crate) fn async_process(
260 mut self,
261 process_id: impl Into<String>,
262 cancellation_token: tokio_util::sync::CancellationToken,
263 ) -> Self {
264 self.async_process_id = Some(process_id.into());
265 self.cancellation_token = Some(cancellation_token);
266 self
267 }
268
269 pub(crate) fn process_events(
270 mut self,
271 process_id: impl Into<String>,
272 registry: Arc<dyn crate::ProcessRegistry>,
273 store: Option<Arc<dyn crate::RuntimePersistence>>,
274 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
275 queued_work_driver: Option<crate::QueuedWorkDriver>,
276 ) -> Self {
277 self.process_events = Some(ToolProcessEventContext {
278 process_id: process_id.into(),
279 registry,
280 store,
281 session_store_factory,
282 session_graph: Arc::clone(&self.session_graph),
283 queued_work_driver,
284 });
285 self
286 }
287
288 pub(crate) fn parent_invocation(mut self, metadata: Option<crate::RuntimeInvocation>) -> Self {
289 self.parent_invocation = metadata;
290 self
291 }
292
293 pub(crate) fn child_execution_trace_hook(
294 mut self,
295 hook: Option<ToolChildExecutionTraceHook>,
296 ) -> Self {
297 self.child_execution_trace_hook = hook;
298 self
299 }
300
301 pub(crate) fn build(self) -> ToolContext<'run> {
302 ToolContext {
303 session_id: self.session_id,
304 agent_frame_id: self.agent_frame_id,
305 sessions: self.sessions,
306 session_lifecycle: self.session_lifecycle,
307 processes: self.processes,
308 process_cancel_ability: self.process_cancel_ability,
309 effect_controller: self.effect_controller,
310 runtime_dispatch: self.runtime_dispatch,
311 runtime_execution_context: self.runtime_execution_context,
312 cancellation_token: self.cancellation_token,
313 async_process_id: self.async_process_id,
314 runtime_process_id: self.runtime_process_id,
315 process_events: self.process_events,
316 attachment_store: self.attachment_store,
317 direct_completions: self.direct_completions,
318 prepared_payload: self.prepared_payload,
319 tool_call_id: self.tool_call_id,
320 attempt_number: 1,
321 max_attempts: 1,
322 replay_key: None,
323 completion: self.completion,
324 durable_effects: self.durable_effects,
325 parent_invocation: self.parent_invocation,
326 execution_env_spec: self.execution_env_spec,
327 child_execution_trace_hook: self.child_execution_trace_hook,
328 }
329 }
330}
331
332impl<'run> ToolContext<'run> {
333 #[cfg(any(test, feature = "testing"))]
334 #[expect(
335 clippy::too_many_arguments,
336 reason = "testing constructor mirrors the sealed runtime tool context dependencies"
337 )]
338 pub(crate) fn builder(
339 session_id: String,
340 sessions: Arc<dyn SessionStateService>,
341 session_lifecycle: Arc<dyn SessionLifecycleService>,
342 session_graph: Arc<dyn SessionGraphService>,
343 processes: Arc<dyn crate::ProcessService>,
344 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
345 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
346 attachment_store: Arc<dyn AttachmentStore>,
347 direct_completions: crate::DirectCompletionClient<'run>,
348 ) -> ToolContextBuilder<'run> {
349 ToolContextBuilder {
350 session_id,
351 agent_frame_id: String::new(),
352 sessions,
353 session_lifecycle,
354 session_graph,
355 processes,
356 process_cancel_ability,
357 effect_controller,
358 runtime_dispatch: None,
359 runtime_execution_context: None,
360 cancellation_token: None,
361 async_process_id: None,
362 runtime_process_id: None,
363 process_events: None,
364 attachment_store,
365 direct_completions,
366 prepared_payload: serde_json::Value::Null,
367 tool_call_id: None,
368 completion: ToolCompletionState::default(),
369 durable_effects: ToolDurableEffectState::default(),
370 parent_invocation: None,
371 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
372 crate::PluginOptions::default(),
373 crate::SessionPolicy::default(),
374 ),
375 child_execution_trace_hook: None,
376 }
377 }
378
379 pub(crate) fn from_dispatch(
380 dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
381 ) -> ToolContextBuilder<'run> {
382 ToolContextBuilder::from_dispatch(dispatch)
383 }
384
385 pub fn session_id(&self) -> &str {
386 &self.session_id
387 }
388
389 pub fn agent_frame_id(&self) -> &str {
390 &self.agent_frame_id
391 }
392
393 pub fn sessions(&self) -> ToolSessionAdmin<'run> {
394 ToolSessionAdmin {
395 session_id: self.session_id.clone(),
396 sessions: Arc::clone(&self.sessions),
397 session_lifecycle: Arc::clone(&self.session_lifecycle),
398 effect_controller: self.effect_controller.clone(),
399 }
400 }
401
402 pub fn dispatch(&self) -> ToolDispatchClient<'run> {
403 ToolDispatchClient {
404 context: self.clone(),
405 }
406 }
407
408 pub fn triggers(&self) -> ToolTriggerClient<'run> {
409 ToolTriggerClient {
410 context: self.clone(),
411 }
412 }
413
414 pub fn processes(&self) -> ToolSessionProcessAdmin<'run> {
415 ToolSessionProcessAdmin {
416 session_id: self.session_id.clone(),
417 agent_frame_id: self.agent_frame_id.clone(),
418 processes: Arc::clone(&self.processes),
419 process_cancel_ability: Arc::clone(&self.process_cancel_ability),
420 effect_controller: self.effect_controller.clone(),
421 parent_invocation: self.parent_invocation.clone(),
422 tool_call_id: self.tool_call_id.clone(),
423 execution_env_spec: self.execution_env_spec.clone(),
424 }
425 }
426
427 pub fn emit_child_process_started(
428 &self,
429 process_id: impl Into<String>,
430 child_entry_name: Option<String>,
431 ) {
432 let Some(hook) = &self.child_execution_trace_hook else {
433 return;
434 };
435 hook.child_process_started(ToolChildProcessStarted {
436 process_id: process_id.into(),
437 child_entry_name,
438 });
439 }
440
441 pub fn direct_completions(&self) -> ToolDirectCompletionClient<'run> {
442 ToolDirectCompletionClient {
443 session_id: self.session_id.clone(),
444 tool_call_id: self.tool_call_id.clone(),
445 direct_completions: self.direct_completions.clone(),
446 parent_invocation: self.parent_invocation.clone(),
447 parent_tool_batch_is_durable: self.parent_invocation.as_ref().is_some_and(
448 |invocation| invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolBatch),
449 ) && self
450 .effect_controller
451 .controller()
452 .durability_tier()
453 == crate::DurabilityTier::Durable,
454 }
455 }
456
457 pub fn attachments(&self) -> ToolAttachmentClient {
458 ToolAttachmentClient {
459 store: Arc::clone(&self.attachment_store),
460 }
461 }
462
463 pub fn process_events(&self) -> ToolProcessEventClient {
464 ToolProcessEventClient {
465 context: self.process_events.clone(),
466 }
467 }
468
469 pub fn durable_effects(&self) -> Result<ToolDurableEffects<'_, 'run>, crate::RuntimeError> {
476 let Some(tool_call_id) = self.tool_call_id.as_deref() else {
477 return Err(crate::RuntimeError::new(
478 "durable_effects_missing_call_id",
479 "durable effects require a prepared tool call id",
480 ));
481 };
482 if tool_call_id.trim().is_empty() {
483 return Err(crate::RuntimeError::new(
484 "durable_effects_missing_call_id",
485 "durable effects require a non-empty prepared tool call id",
486 ));
487 }
488 let scoped = self.effect_controller.scoped();
489 if !scoped.controller().supports_durable_effects() {
490 return Err(crate::RuntimeError::new(
491 "durable_effects_unavailable",
492 "this effect controller does not support durable tool effects",
493 ));
494 }
495 if self.parent_invocation.as_ref().is_some_and(|invocation| {
496 invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolBatch)
497 }) && scoped.controller().durability_tier() == crate::DurabilityTier::Durable
498 {
499 return Err(crate::RuntimeError::new(
500 "durable_effects_unavailable_in_tool_batch",
501 "durable tool sub-effects are not available inside a tool batch child",
502 ));
503 }
504 Ok(ToolDurableEffects { context: self })
505 }
506
507 pub fn cancellation_token(&self) -> Option<&tokio_util::sync::CancellationToken> {
508 self.cancellation_token.as_ref()
509 }
510
511 pub fn async_process_id(&self) -> Option<&str> {
512 self.async_process_id.as_deref()
513 }
514
515 pub fn runtime_process_id(&self) -> Option<&str> {
516 self.async_process_id
517 .as_deref()
518 .or(self.runtime_process_id.as_deref())
519 .or_else(|| {
520 self.process_events
521 .as_ref()
522 .map(|context| context.process_id.as_str())
523 })
524 }
525
526 pub fn tool_call_id(&self) -> Option<&str> {
527 self.tool_call_id.as_deref()
528 }
529
530 pub fn prepared_payload(&self) -> &serde_json::Value {
531 &self.prepared_payload
532 }
533
534 pub fn decode_prepared_payload<T>(&self) -> Result<T, serde_json::Error>
535 where
536 T: serde::de::DeserializeOwned,
537 {
538 serde_json::from_value(self.prepared_payload.clone())
539 }
540
541 pub fn attempt_number(&self) -> u32 {
542 self.attempt_number
543 }
544
545 pub fn max_attempts(&self) -> u32 {
546 self.max_attempts
547 }
548
549 pub fn replay_key(&self) -> Option<&str> {
550 self.replay_key.as_deref()
551 }
552
553 pub async fn completion_key(&self) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
567 let tool_call_id = self.tool_call_id.clone().ok_or_else(|| {
568 crate::RuntimeError::new(
569 "tool_completion_key_missing_call_id",
570 "completion keys require a prepared tool call id",
571 )
572 })?;
573 let scoped = self.effect_controller.scoped();
574 let key = scoped
575 .controller()
576 .await_event_key(
577 scoped.execution_scope(),
578 crate::AwaitEventWaitIdentity::tool_completion(tool_call_id),
579 )
580 .await?;
581 self.completion.store(key)
582 }
583
584 pub(crate) fn take_completion_key(
585 &self,
586 ) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
587 self.completion.take()
588 }
589
590 pub fn with_async_process(
591 mut self,
592 process_id: impl Into<String>,
593 cancellation_token: tokio_util::sync::CancellationToken,
594 ) -> Self {
595 self.async_process_id = Some(process_id.into());
596 self.runtime_process_id = self.async_process_id.clone();
597 self.cancellation_token = Some(cancellation_token);
598 self
599 }
600
601 #[cfg(any(test, feature = "testing"))]
602 #[doc(hidden)]
603 pub fn with_process_events_for_testing(
604 mut self,
605 process_id: impl Into<String>,
606 registry: Arc<dyn crate::ProcessRegistry>,
607 ) -> Self {
608 self.process_events = Some(ToolProcessEventContext {
609 process_id: process_id.into(),
610 registry,
611 store: None,
612 session_store_factory: None,
613 session_graph: Arc::new(crate::plugin::NoopSessionManager),
614 queued_work_driver: None,
615 });
616 self
617 }
618
619 pub(crate) fn with_retry_context(
620 mut self,
621 tool_name: &str,
622 attempt_number: u32,
623 max_attempts: u32,
624 ) -> Self {
625 self.attempt_number = attempt_number.max(1);
626 self.max_attempts = max_attempts.max(1);
627 self.replay_key = self
628 .tool_call_id
629 .as_ref()
630 .map(|call_id| format!("lash-tool:{}:{call_id}:{tool_name}", self.session_id));
631 self
632 }
633
634 pub(crate) fn with_prepared_payload(mut self, payload: serde_json::Value) -> Self {
635 self.prepared_payload = payload;
636 self
637 }
638
639 #[cfg(any(test, feature = "testing"))]
642 #[doc(hidden)]
643 #[expect(
644 clippy::too_many_arguments,
645 reason = "test-only constructor mirrors the sealed runtime tool context"
646 )]
647 pub fn __for_testing(
648 session_id: String,
649 sessions: Arc<dyn SessionStateService>,
650 session_lifecycle: Arc<dyn SessionLifecycleService>,
651 session_graph: Arc<dyn SessionGraphService>,
652 processes: Arc<dyn crate::ProcessService>,
653 attachment_store: Arc<dyn AttachmentStore>,
654 direct_completions: crate::DirectCompletionClient<'static>,
655 tool_call_id: Option<String>,
656 ) -> ToolContext<'static> {
657 ToolContext::builder(
658 session_id,
659 sessions,
660 session_lifecycle,
661 session_graph,
662 processes,
663 Arc::new(crate::DefaultProcessCancelAbility),
664 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
665 crate::InlineRuntimeEffectController,
666 )),
667 attachment_store,
668 direct_completions,
669 )
670 .tool_call_id(tool_call_id)
671 .build()
672 }
673
674 #[cfg(any(test, feature = "testing"))]
678 #[doc(hidden)]
679 #[expect(
680 clippy::too_many_arguments,
681 reason = "test-only constructor mirrors the sealed runtime context"
682 )]
683 pub fn __for_testing_with_process_cancel_ability(
684 session_id: String,
685 sessions: Arc<dyn SessionStateService>,
686 session_lifecycle: Arc<dyn SessionLifecycleService>,
687 session_graph: Arc<dyn SessionGraphService>,
688 processes: Arc<dyn crate::ProcessService>,
689 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
690 attachment_store: Arc<dyn AttachmentStore>,
691 direct_completions: crate::DirectCompletionClient<'static>,
692 tool_call_id: Option<String>,
693 ) -> ToolContext<'static> {
694 ToolContext::builder(
695 session_id,
696 sessions,
697 session_lifecycle,
698 session_graph,
699 processes,
700 process_cancel_ability,
701 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
702 crate::InlineRuntimeEffectController,
703 )),
704 attachment_store,
705 direct_completions,
706 )
707 .tool_call_id(tool_call_id)
708 .build()
709 }
710}
711
712pub struct ToolDurableEffects<'ctx, 'run> {
718 context: &'ctx ToolContext<'run>,
719}
720
721impl<'ctx, 'run> ToolDurableEffects<'ctx, 'run> {
722 pub async fn run_json<F, Fut>(
723 &self,
724 step_id: impl Into<String>,
725 input: serde_json::Value,
726 run: F,
727 ) -> Result<serde_json::Value, crate::RuntimeError>
728 where
729 F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
730 Fut: Future<Output = Result<serde_json::Value, crate::RuntimeError>> + Send + 'run,
731 {
732 let step_id = step_id.into();
733 if step_id.trim().is_empty() {
734 return Err(crate::RuntimeError::new(
735 "durable_effect_empty_step_id",
736 "durable effect step id must be non-empty",
737 ));
738 }
739 self.context.durable_effects.reserve_step(&step_id)?;
740 let invocation = self.step_invocation(
741 format!("durable-step:{step_id}"),
742 crate::RuntimeEffectKind::DurableStep,
743 format!("durable-step:{step_id}"),
744 )?;
745 let outcome = self
746 .context
747 .effect_controller
748 .controller()
749 .execute_effect(
750 crate::RuntimeEffectEnvelope::new(
751 invocation,
752 crate::RuntimeEffectCommand::DurableStep {
753 step_id,
754 input: input.clone(),
755 },
756 ),
757 crate::RuntimeEffectLocalExecutor::durable_step(run),
758 )
759 .await
760 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
761 outcome
762 .into_durable_step()
763 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)
764 }
765
766 pub async fn external_event_key(
767 &self,
768 key: impl Into<String>,
769 ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
770 let key = key.into();
771 if key.trim().is_empty() {
772 return Err(crate::RuntimeError::new(
773 "durable_effect_empty_event_key",
774 "durable effect external event key must be non-empty",
775 ));
776 }
777 let scoped = self.context.effect_controller.scoped();
778 scoped
779 .controller()
780 .await_event_key(
781 scoped.execution_scope(),
782 crate::AwaitEventWaitIdentity::Custom { key },
783 )
784 .await
785 }
786
787 pub async fn await_event_json(
788 &self,
789 key: crate::AwaitEventKey,
790 ) -> Result<serde_json::Value, crate::RuntimeError> {
791 let invocation = self.step_invocation(
792 format!("await-event:{}", key.key_id),
793 crate::RuntimeEffectKind::AwaitEvent,
794 format!("await-event:{}", key.key_id),
795 )?;
796 let cancellation = self.context.cancellation_token.clone().unwrap_or_default();
797 let clock = self
798 .context
799 .runtime_dispatch
800 .as_ref()
801 .map(|dispatch| Arc::clone(&dispatch.clock))
802 .unwrap_or_else(|| Arc::new(crate::SystemClock));
803 let outcome = self
804 .context
805 .effect_controller
806 .controller()
807 .execute_effect(
808 crate::RuntimeEffectEnvelope::new(
809 invocation,
810 crate::RuntimeEffectCommand::AwaitEvent { key },
811 ),
812 crate::RuntimeEffectLocalExecutor::await_event_with_clock(
813 cancellation,
814 None,
815 clock,
816 ),
817 )
818 .await
819 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
820 match outcome
821 .into_await_event()
822 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?
823 {
824 crate::Resolution::Ok(value) => Ok(value),
825 crate::Resolution::Err(err) => Err(crate::RuntimeError::new(err.code, err.message)),
826 crate::Resolution::Timeout => Err(crate::RuntimeError::new(
827 "durable_effect_event_timeout",
828 "durable effect external event wait timed out",
829 )),
830 crate::Resolution::Cancelled => Err(crate::RuntimeError::new(
831 "durable_effect_event_cancelled",
832 "durable effect external event wait was cancelled",
833 )),
834 }
835 }
836
837 pub async fn emit_process_event(
838 &self,
839 event_type: impl Into<String>,
840 payload: serde_json::Value,
841 ) -> Result<crate::ProcessEvent, crate::RuntimeError> {
842 let Some(process) = self.context.process_events.as_ref() else {
843 return Err(crate::RuntimeError::new(
844 "durable_effect_process_event_unavailable",
845 "durable effect process events are unavailable outside a durable process",
846 ));
847 };
848 let event_type = event_type.into();
849 if event_type.trim().is_empty() {
850 return Err(crate::RuntimeError::new(
851 "durable_effect_empty_process_event_type",
852 "durable effect process event type must be non-empty",
853 ));
854 }
855 let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
856 crate::RuntimeError::new(
857 "durable_effects_missing_call_id",
858 "durable effects require a prepared tool call id",
859 )
860 })?;
861 let sequence = self.context.durable_effects.next_process_event_sequence();
862 let request = crate::ProcessEventAppendRequest::new(event_type, payload).with_replay_key(
863 format!("tool:{tool_call_id}:durable-process-event:{sequence}"),
864 );
865 self.context
866 .process_events()
867 .emit_request(request)
868 .await
869 .map_err(|err| {
870 crate::RuntimeError::new(
871 "durable_effect_process_event_append_failed",
872 err.to_string(),
873 )
874 })
875 .and_then(|event| {
876 if event.process_id == process.process_id {
877 Ok(event)
878 } else {
879 Err(crate::RuntimeError::new(
880 "durable_effect_process_event_process_mismatch",
881 "process event append returned an event for a different process",
882 ))
883 }
884 })
885 }
886
887 fn step_invocation(
888 &self,
889 effect_id_suffix: impl Into<String>,
890 kind: crate::RuntimeEffectKind,
891 replay_suffix: impl AsRef<str>,
892 ) -> Result<crate::RuntimeInvocation, crate::RuntimeError> {
893 let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
894 crate::RuntimeError::new(
895 "durable_effects_missing_call_id",
896 "durable effects require a prepared tool call id",
897 )
898 })?;
899 let effect_id_suffix = effect_id_suffix.into();
900 if let Some(parent) = self.context.parent_invocation.as_ref() {
901 return Ok(crate::runtime::causal::child_effect_invocation(
902 parent,
903 format!("{tool_call_id}:{effect_id_suffix}"),
904 kind,
905 replay_suffix,
906 ));
907 }
908 let scoped = self.context.effect_controller.scoped();
909 let replay_key = format!(
910 "{}:tool:{tool_call_id}:{}",
911 scoped.scope_id(),
912 replay_suffix.as_ref()
913 );
914 Ok(crate::RuntimeInvocation::effect(
915 crate::RuntimeScope::new(self.context.session_id.clone()),
916 format!("{tool_call_id}:{effect_id_suffix}"),
917 kind,
918 replay_key,
919 ))
920 }
921}
922
923#[derive(Clone, Debug, Serialize, Deserialize)]
929pub struct PreparedToolCall {
930 pub call_id: String,
931 pub tool_id: ToolId,
932 pub tool_name: String,
933 pub args: serde_json::Value,
934 #[serde(default, skip_serializing_if = "Option::is_none")]
935 pub replay: Option<ProviderReplayMeta>,
936 #[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
937 pub prepared_payload: serde_json::Value,
938}
939
940impl PreparedToolCall {
941 pub fn identity(tool_id: ToolId, call: crate::sansio::PendingToolCall) -> Self {
942 Self {
943 call_id: call.call_id,
944 tool_id,
945 tool_name: call.tool_name,
946 args: call.args,
947 replay: call.replay,
948 prepared_payload: serde_json::Value::Null,
949 }
950 }
951
952 pub fn from_parts(
953 call_id: impl Into<String>,
954 tool_id: impl Into<ToolId>,
955 tool_name: impl Into<String>,
956 args: serde_json::Value,
957 replay: Option<ProviderReplayMeta>,
958 prepared_payload: serde_json::Value,
959 ) -> Self {
960 Self {
961 call_id: call_id.into(),
962 tool_id: tool_id.into(),
963 tool_name: tool_name.into(),
964 args,
965 replay,
966 prepared_payload,
967 }
968 }
969}
970
971#[derive(Clone, Debug, Serialize, Deserialize)]
977pub struct PreparedToolBatchCall {
978 pub call: PreparedToolCall,
979 pub replay_suffix: String,
980}
981
982#[derive(Clone, Debug, Serialize, Deserialize)]
988pub struct PreparedToolBatch {
989 pub batch_id: String,
990 pub calls: Vec<PreparedToolBatchCall>,
991}
992
993impl PreparedToolBatch {
994 pub fn new(batch_id: impl Into<String>, calls: Vec<PreparedToolCall>) -> Self {
995 let batch_id = batch_id.into();
996 let calls = calls
997 .into_iter()
998 .enumerate()
999 .map(|(index, call)| PreparedToolBatchCall {
1000 replay_suffix: format!("child:{index}:{}", call.call_id),
1001 call,
1002 })
1003 .collect();
1004 Self { batch_id, calls }
1005 }
1006
1007 pub fn is_empty(&self) -> bool {
1008 self.calls.is_empty()
1009 }
1010
1011 pub fn len(&self) -> usize {
1012 self.calls.len()
1013 }
1014}
1015
1016#[derive(Clone)]
1017pub struct ToolPrepareContext {
1018 session_id: String,
1019 sessions: Arc<dyn SessionStateService>,
1020 turn_context: crate::TurnContext,
1021 tool_call_id: Option<String>,
1022}
1023
1024impl ToolPrepareContext {
1025 pub(crate) fn new(
1026 session_id: String,
1027 sessions: Arc<dyn SessionStateService>,
1028 turn_context: crate::TurnContext,
1029 tool_call_id: Option<String>,
1030 ) -> Self {
1031 Self {
1032 session_id,
1033 sessions,
1034 turn_context,
1035 tool_call_id,
1036 }
1037 }
1038
1039 pub fn session_id(&self) -> &str {
1040 &self.session_id
1041 }
1042
1043 pub fn tool_call_id(&self) -> Option<&str> {
1044 self.tool_call_id.as_deref()
1045 }
1046
1047 pub fn turn_context(&self) -> &crate::TurnContext {
1048 &self.turn_context
1049 }
1050
1051 pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
1052 where
1053 T: 'static,
1054 {
1055 self.turn_context.plugin_input::<T>(plugin_id)
1056 }
1057
1058 pub async fn session_snapshot(&self) -> Result<SessionSnapshot, PluginError> {
1059 self.sessions.snapshot_session(&self.session_id).await
1060 }
1061
1062 pub async fn tool_catalog(&self) -> Result<Vec<serde_json::Value>, PluginError> {
1063 self.sessions.tool_catalog(&self.session_id).await
1064 }
1065
1066 pub async fn shared_tool_catalog(
1067 &self,
1068 ) -> Result<std::sync::Arc<Vec<serde_json::Value>>, PluginError> {
1069 self.sessions.shared_tool_catalog(&self.session_id).await
1070 }
1071}
1072
1073pub struct ToolPrepareCall<'a> {
1075 pub tool_id: ToolId,
1076 pub pending: crate::sansio::PendingToolCall,
1077 pub context: &'a ToolPrepareContext,
1078}
1079
1080pub struct ToolCall<'a> {
1087 pub name: &'a str,
1088 pub args: &'a serde_json::Value,
1089 pub context: &'a ToolContext<'a>,
1090 pub progress: Option<&'a ProgressSender>,
1091}
1092
1093#[async_trait::async_trait]
1101pub trait ToolProvider: Send + Sync + 'static {
1102 fn tool_manifests(&self) -> Vec<ToolManifest>;
1103 fn resolve_manifest(&self, name: &str) -> Option<ToolManifest> {
1104 self.tool_manifests()
1105 .into_iter()
1106 .find(|manifest| manifest.name == name)
1107 }
1108 fn resolve_manifest_by_id(&self, id: &ToolId) -> Option<ToolManifest> {
1109 self.tool_manifests()
1110 .into_iter()
1111 .find(|manifest| manifest.id == *id)
1112 }
1113 fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>>;
1114 fn resolve_contract_by_id(&self, id: &ToolId) -> Option<Arc<ToolContract>> {
1115 let manifest = self.resolve_manifest_by_id(id)?;
1116 self.resolve_contract(&manifest.name)
1117 }
1118 async fn prepare_tool_call(
1119 &self,
1120 call: ToolPrepareCall<'_>,
1121 ) -> Result<PreparedToolCall, ToolResult> {
1122 Ok(PreparedToolCall::identity(call.tool_id, call.pending))
1123 }
1124 async fn execute(&self, call: ToolCall<'_>) -> ToolResult;
1125 async fn execute_by_id(
1126 &self,
1127 tool_id: &ToolId,
1128 args: &serde_json::Value,
1129 context: &ToolContext<'_>,
1130 progress: Option<&ProgressSender>,
1131 ) -> ToolResult {
1132 let Some(manifest) = self.resolve_manifest_by_id(tool_id) else {
1133 return ToolResult::err_fmt(format!("Unknown tool id: {tool_id}"));
1134 };
1135 self.execute(ToolCall {
1136 name: &manifest.name,
1137 args,
1138 context,
1139 progress,
1140 })
1141 .await
1142 }
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147 use super::*;
1148 use crate::ProcessRegistry;
1149 use crate::RuntimeEffectController;
1150 use std::sync::atomic::{AtomicU64, Ordering};
1151
1152 struct NoDurableEffectController;
1153
1154 #[async_trait::async_trait]
1155 impl crate::RuntimeEffectController for NoDurableEffectController {
1156 async fn execute_effect(
1157 &self,
1158 _envelope: crate::RuntimeEffectEnvelope,
1159 _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
1160 ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
1161 Err(crate::RuntimeEffectControllerError::new(
1162 "unexpected_effect",
1163 "test controller should not execute effects",
1164 ))
1165 }
1166 }
1167
1168 fn test_context_with_controller(
1169 tool_call_id: Option<String>,
1170 controller: Arc<dyn crate::RuntimeEffectController>,
1171 ) -> ToolContext<'static> {
1172 ToolContext::builder(
1173 "session-1".to_string(),
1174 Arc::new(crate::testing::MockSessionManager::default()),
1175 Arc::new(crate::testing::MockSessionManager::default()),
1176 Arc::new(crate::testing::MockSessionManager::default()),
1177 Arc::new(crate::UnavailableProcessService),
1178 Arc::new(crate::DefaultProcessCancelAbility),
1179 crate::runtime::RuntimeEffectControllerHandle::shared(controller),
1180 Arc::new(crate::InMemoryAttachmentStore::new()),
1181 crate::DirectCompletionClient::unavailable(
1182 "direct completions are unavailable in this test context",
1183 ),
1184 )
1185 .tool_call_id(tool_call_id)
1186 .build()
1187 }
1188
1189 #[test]
1190 fn tool_context_builder_carries_call_payload_and_cancellation_state() {
1191 let cancellation = tokio_util::sync::CancellationToken::new();
1192 let prepared = PreparedToolCall::from_parts(
1193 "call-1",
1194 "tool:demo_tool",
1195 "demo_tool",
1196 serde_json::json!({ "input": true }),
1197 None,
1198 serde_json::json!({ "prepared": true }),
1199 );
1200
1201 let context = ToolContext::builder(
1202 "session-1".to_string(),
1203 Arc::new(crate::testing::MockSessionManager::default()),
1204 Arc::new(crate::testing::MockSessionManager::default()),
1205 Arc::new(crate::testing::MockSessionManager::default()),
1206 Arc::new(crate::UnavailableProcessService),
1207 Arc::new(crate::DefaultProcessCancelAbility),
1208 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1209 crate::InlineRuntimeEffectController,
1210 )),
1211 Arc::new(crate::InMemoryAttachmentStore::new()),
1212 crate::DirectCompletionClient::unavailable(
1213 "direct completions are unavailable in this test context",
1214 ),
1215 )
1216 .prepared_call(&prepared)
1217 .cancellation_token(Some(cancellation.clone()))
1218 .async_process("process-1", cancellation.clone())
1219 .build();
1220
1221 assert_eq!(context.session_id(), "session-1");
1222 assert_eq!(context.tool_call_id(), Some("call-1"));
1223 assert_eq!(
1224 context.prepared_payload(),
1225 &serde_json::json!({ "prepared": true })
1226 );
1227 assert_eq!(context.async_process_id(), Some("process-1"));
1228 assert!(context.cancellation_token().is_some());
1229 }
1230
1231 #[test]
1232 fn durable_effects_requires_prepared_call_id_and_supporting_controller() {
1233 let missing_call =
1234 test_context_with_controller(None, Arc::new(crate::InlineRuntimeEffectController));
1235 let err = match missing_call.durable_effects() {
1236 Ok(_) => panic!("missing prepared tool call id should fail"),
1237 Err(err) => err,
1238 };
1239 assert_eq!(err.code.as_str(), "durable_effects_missing_call_id");
1240
1241 let unsupported = test_context_with_controller(
1242 Some("call-1".to_string()),
1243 Arc::new(NoDurableEffectController),
1244 );
1245 let err = match unsupported.durable_effects() {
1246 Ok(_) => panic!("unsupported controller should fail"),
1247 Err(err) => err,
1248 };
1249 assert_eq!(err.code.as_str(), "durable_effects_unavailable");
1250 }
1251
1252 #[tokio::test]
1253 async fn durable_run_json_executes_and_maps_closure_errors() {
1254 let context = test_context_with_controller(
1255 Some("call-run-json".to_string()),
1256 Arc::new(crate::InlineRuntimeEffectController),
1257 );
1258 let durable = context.durable_effects().expect("durable effects");
1259 let value = durable
1260 .run_json(
1261 "create",
1262 serde_json::json!({ "x": 1 }),
1263 |input| async move { Ok(serde_json::json!({ "seen": input["x"] })) },
1264 )
1265 .await
1266 .expect("durable step");
1267 assert_eq!(value, serde_json::json!({ "seen": 1 }));
1268
1269 let err = durable
1270 .run_json("fail", serde_json::json!({}), |_| async {
1271 Err(crate::RuntimeError::new(
1272 "durable_step_failed",
1273 "step failed",
1274 ))
1275 })
1276 .await
1277 .expect_err("closure error");
1278 assert_eq!(err.code.as_str(), "durable_step_failed");
1279 assert_eq!(err.message, "step failed");
1280 }
1281
1282 #[tokio::test]
1283 async fn durable_run_json_rejects_empty_or_duplicate_step_ids_before_running() {
1284 let context = test_context_with_controller(
1285 Some("call-step-ids".to_string()),
1286 Arc::new(crate::InlineRuntimeEffectController),
1287 );
1288 let durable = context.durable_effects().expect("durable effects");
1289 let runs = Arc::new(AtomicU64::new(0));
1290
1291 let err = durable
1292 .run_json("", serde_json::Value::Null, {
1293 let runs = Arc::clone(&runs);
1294 move |_| async move {
1295 runs.fetch_add(1, Ordering::Relaxed);
1296 Ok(serde_json::Value::Null)
1297 }
1298 })
1299 .await
1300 .expect_err("empty step id");
1301 assert_eq!(err.code.as_str(), "durable_effect_empty_step_id");
1302 assert_eq!(runs.load(Ordering::Relaxed), 0);
1303
1304 durable
1305 .run_json("same", serde_json::Value::Null, {
1306 let runs = Arc::clone(&runs);
1307 move |_| async move {
1308 runs.fetch_add(1, Ordering::Relaxed);
1309 Ok(serde_json::Value::Null)
1310 }
1311 })
1312 .await
1313 .expect("first step");
1314 let err = durable
1315 .run_json("same", serde_json::Value::Null, {
1316 let runs = Arc::clone(&runs);
1317 move |_| async move {
1318 runs.fetch_add(1, Ordering::Relaxed);
1319 Ok(serde_json::Value::Null)
1320 }
1321 })
1322 .await
1323 .expect_err("duplicate step id");
1324 assert_eq!(err.code.as_str(), "durable_effect_duplicate_step_id");
1325 assert_eq!(runs.load(Ordering::Relaxed), 1);
1326 }
1327
1328 #[tokio::test]
1329 async fn durable_external_event_key_is_custom_and_stable() {
1330 let context = test_context_with_controller(
1331 Some("call-event-key".to_string()),
1332 Arc::new(crate::InlineRuntimeEffectController),
1333 );
1334 let durable = context.durable_effects().expect("durable effects");
1335 let first = durable
1336 .external_event_key("tool-event-stable")
1337 .await
1338 .expect("first key");
1339 let second = durable
1340 .external_event_key("tool-event-stable")
1341 .await
1342 .expect("second key");
1343
1344 assert_eq!(first, second);
1345 assert_eq!(
1346 first.wait,
1347 crate::AwaitEventWaitIdentity::Custom {
1348 key: "tool-event-stable".to_string()
1349 }
1350 );
1351 }
1352
1353 #[tokio::test]
1354 async fn durable_await_event_json_maps_terminal_resolutions() {
1355 let controller = Arc::new(crate::InlineRuntimeEffectController);
1356 let context =
1357 test_context_with_controller(Some("call-await-event".to_string()), controller.clone());
1358 let durable = context.durable_effects().expect("durable effects");
1359
1360 let ok_key = durable
1361 .external_event_key("tool-event-ok")
1362 .await
1363 .expect("ok key");
1364 controller
1365 .resolve_await_event(
1366 &ok_key,
1367 crate::Resolution::Ok(serde_json::json!({ "answer": 42 })),
1368 )
1369 .await
1370 .expect("resolve ok");
1371 let value = durable
1372 .await_event_json(ok_key)
1373 .await
1374 .expect("await ok value");
1375 assert_eq!(value, serde_json::json!({ "answer": 42 }));
1376
1377 let err_key = durable
1378 .external_event_key("tool-event-err")
1379 .await
1380 .expect("err key");
1381 controller
1382 .resolve_await_event(
1383 &err_key,
1384 crate::Resolution::Err(crate::ExternalCompletionError::new(
1385 "external_bad",
1386 "external failed",
1387 )),
1388 )
1389 .await
1390 .expect("resolve err");
1391 let err = durable
1392 .await_event_json(err_key)
1393 .await
1394 .expect_err("await err value");
1395 assert_eq!(err.code.as_str(), "external_bad");
1396
1397 let cancelled_key = durable
1398 .external_event_key("tool-event-cancelled")
1399 .await
1400 .expect("cancelled key");
1401 controller
1402 .resolve_await_event(&cancelled_key, crate::Resolution::Cancelled)
1403 .await
1404 .expect("resolve cancelled");
1405 let err = durable
1406 .await_event_json(cancelled_key)
1407 .await
1408 .expect_err("await cancelled value");
1409 assert_eq!(err.code.as_str(), "durable_effect_event_cancelled");
1410
1411 let timeout_key = durable
1412 .external_event_key("tool-event-timeout")
1413 .await
1414 .expect("timeout key");
1415 controller
1416 .resolve_await_event(&timeout_key, crate::Resolution::Timeout)
1417 .await
1418 .expect("resolve timeout");
1419 let err = durable
1420 .await_event_json(timeout_key)
1421 .await
1422 .expect_err("await timeout value");
1423 assert_eq!(err.code.as_str(), "durable_effect_event_timeout");
1424 }
1425
1426 #[tokio::test]
1427 async fn durable_emit_process_event_requires_process_and_appends_inside_process() {
1428 let context = test_context_with_controller(
1429 Some("call-no-process".to_string()),
1430 Arc::new(crate::InlineRuntimeEffectController),
1431 );
1432 let err = context
1433 .durable_effects()
1434 .expect("durable effects")
1435 .emit_process_event("tool.event", serde_json::json!({}))
1436 .await
1437 .expect_err("outside process");
1438 assert_eq!(
1439 err.code.as_str(),
1440 "durable_effect_process_event_unavailable"
1441 );
1442
1443 let registry = Arc::new(crate::TestLocalProcessRegistry::default());
1444 let process_id = "process:durable-tool-event";
1445 registry
1446 .register_process(
1447 crate::ProcessRegistration::new(
1448 process_id,
1449 crate::ProcessInput::External {
1450 metadata: serde_json::json!({}),
1451 },
1452 crate::ProcessProvenance::host(),
1453 )
1454 .with_extra_event_types([crate::ProcessEventType {
1455 name: "tool.event".to_string(),
1456 payload_schema: crate::LashSchema::any(),
1457 semantics: crate::ProcessEventSemanticsSpec::default(),
1458 }]),
1459 )
1460 .await
1461 .expect("register process");
1462 let registry_dyn: Arc<dyn crate::ProcessRegistry> = registry;
1463 let context = test_context_with_controller(
1464 Some("call-process-event".to_string()),
1465 Arc::new(crate::InlineRuntimeEffectController),
1466 )
1467 .with_process_events_for_testing(process_id, registry_dyn);
1468
1469 let event = context
1470 .durable_effects()
1471 .expect("durable effects")
1472 .emit_process_event("tool.event", serde_json::json!({ "ok": true }))
1473 .await
1474 .expect("process event");
1475 assert_eq!(event.process_id, process_id);
1476 assert_eq!(event.event_type, "tool.event");
1477 assert_eq!(event.payload, serde_json::json!({ "ok": true }));
1478 assert_eq!(
1479 event.invocation.replay_key(),
1480 Some("tool:call-process-event:durable-process-event:0")
1481 );
1482
1483 let append_err = context
1484 .durable_effects()
1485 .expect("durable effects")
1486 .emit_process_event("undeclared.event", serde_json::json!({}))
1487 .await
1488 .expect_err("undeclared event type must fail the append");
1489 assert_eq!(
1490 append_err.code.as_str(),
1491 "durable_effect_process_event_append_failed"
1492 );
1493 }
1494}