Skip to main content

lash_core/session/
execution_context.rs

1use std::sync::Arc;
2
3use tokio::sync::mpsc::Sender;
4use tokio_util::sync::CancellationToken;
5
6use crate::tool_dispatch::ToolDispatchContext;
7use crate::{TurnActivity, TurnActivityId, TurnEvent};
8
9#[derive(Clone)]
10pub struct RuntimeExecutionContext<'run> {
11    pub(super) session_id: String,
12    pub(super) dispatch: Arc<ToolDispatchContext<'run>>,
13    process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
14    attachment_store: Arc<dyn crate::AttachmentStore>,
15    chronological_projection: Arc<crate::ChronologicalProjection>,
16    protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
17    turn_context: crate::TurnContext,
18    execution_env_spec: crate::ProcessExecutionEnvSpec,
19    process_originator: Option<crate::ProcessOriginator>,
20    pub(super) runtime_process_id: Option<String>,
21    pub(super) process_event_context: Option<RuntimeExecutionProcessEventContext>,
22    process_env_ref: Option<crate::ProcessExecutionEnvRef>,
23    process_wake_target: Option<crate::SessionScope>,
24    pub(super) parent_invocation: Option<crate::RuntimeInvocation>,
25    turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
26    pub(super) turn_event_tx: Option<Sender<TurnActivity>>,
27    pub(super) cancellation_token: Option<CancellationToken>,
28    /// Per-tool trace emission handle for this execution. Present only when the
29    /// host installed a trace sink; `None` keeps every trace call a no-op.
30    tracing: Option<RuntimeExecutionTracing>,
31    /// Graph key of the enclosing code block, stamped onto the per-tool
32    /// `TurnEvent`s emitted from this context so consumers can attribute a tool
33    /// call to its code block without ordering heuristics. `None` when the
34    /// context is not executing a code block.
35    code_block_graph_key: Option<String>,
36    /// Call id of the parent `batch` tool call when this context runs the
37    /// children of a batch dispatch, stamped onto child `TurnEvent`s. `None`
38    /// for top-level tool execution.
39    batch_parent_call_id: Option<String>,
40    /// Work-driver handle for this execution's process wiring, when the
41    /// deployment provides one. Threaded through so in-run process
42    /// operations (e.g. signalling another process) that build their own
43    /// `RuntimeEffectLocalExecutor::processes(..)` call can hand it along
44    /// instead of falling back to hub-less backoff polling.
45    process_work_driver: Option<crate::ProcessWorkDriver>,
46    /// Process ids started by THIS execution context. Possession of a handle
47    /// the run itself created is sufficient capability to await/cancel it —
48    /// run-local children are not session handle grants (the ephemeral
49    /// execution scope must never appear in durable grant state).
50    started_process_ids: Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
51}
52
53#[derive(Clone)]
54pub(super) struct RuntimeExecutionProcessEventContext {
55    pub process_id: String,
56    pub registry: Arc<dyn crate::ProcessRegistry>,
57    pub awaiter: crate::ProcessAwaiter,
58    pub store: Option<Arc<dyn crate::RuntimePersistence>>,
59    pub session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
60    pub queued_work_driver: Option<crate::QueuedWorkDriver>,
61}
62
63/// Trace-sink handle threaded into tool execution so per-tool trace events are
64/// emitted from the single shared seam, whichever protocol drives the turn.
65///
66/// `scope_context` carries the turn-scoped identity (session / turn / iteration)
67/// so [`crate::trace::assign_span_identity`] stamps `tool:<call_id>` under the
68/// right turn; `base_context` carries the host's run-level trace context.
69#[derive(Clone)]
70pub(crate) struct RuntimeExecutionTracing {
71    sink: Arc<dyn lash_trace::TraceSink>,
72    base_context: lash_trace::TraceContext,
73    scope_context: lash_trace::TraceContext,
74}
75
76impl RuntimeExecutionTracing {
77    pub(crate) fn new(
78        sink: Arc<dyn lash_trace::TraceSink>,
79        base_context: lash_trace::TraceContext,
80        scope_context: lash_trace::TraceContext,
81    ) -> Self {
82        Self {
83            sink,
84            base_context,
85            scope_context,
86        }
87    }
88
89    fn emit(&self, event: lash_trace::TraceEvent, clock: &dyn crate::Clock) {
90        crate::trace::emit_trace(
91            &Some(Arc::clone(&self.sink)),
92            &self.base_context,
93            self.scope_context.clone(),
94            event,
95            clock,
96        );
97    }
98}
99
100impl<'run> RuntimeExecutionContext<'run> {
101    pub(crate) fn drain_tool_trigger_outcomes(
102        &self,
103    ) -> Result<Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>, crate::PluginError> {
104        self.dispatch
105            .trigger_outcomes
106            .drain()
107            .map_err(crate::PluginError::Session)
108    }
109
110    pub(super) fn process_scope(
111        &self,
112        parent_invocation: Option<crate::RuntimeInvocation>,
113    ) -> crate::ProcessOpScope<'_> {
114        crate::ProcessOpScope::new(self.dispatch.effect_controller.scoped())
115            .with_parent_invocation(parent_invocation)
116            .with_agent_frame_id(Some(self.dispatch.agent_frame_id.clone()))
117    }
118
119    #[allow(
120        clippy::too_many_arguments,
121        reason = "code execution bridge carries explicit per-turn runtime dependencies"
122    )]
123    pub(crate) fn new(
124        session_id: String,
125        dispatch: Arc<ToolDispatchContext<'run>>,
126        process_env_store: Arc<dyn crate::ProcessExecutionEnvStore>,
127        attachment_store: Arc<dyn crate::AttachmentStore>,
128        chronological_projection: Arc<crate::ChronologicalProjection>,
129        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
130        turn_context: crate::TurnContext,
131    ) -> Self {
132        Self {
133            session_id,
134            dispatch,
135            process_env_store,
136            attachment_store,
137            chronological_projection,
138            protocol_extension,
139            turn_context,
140            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
141                crate::PluginOptions::default(),
142                crate::SessionPolicy::default(),
143            ),
144            process_originator: None,
145            runtime_process_id: None,
146            process_event_context: None,
147            started_process_ids: Arc::default(),
148            process_env_ref: None,
149            process_wake_target: None,
150            parent_invocation: None,
151            turn_phase_probe: None,
152            turn_event_tx: None,
153            cancellation_token: None,
154            tracing: None,
155            code_block_graph_key: None,
156            batch_parent_call_id: None,
157            process_work_driver: None,
158        }
159    }
160
161    pub fn session_id(&self) -> &str {
162        &self.session_id
163    }
164
165    pub fn execution_scope_id(&self) -> String {
166        self.dispatch
167            .effect_controller
168            .scoped()
169            .scope_id()
170            .to_string()
171    }
172
173    pub fn session_scope(&self) -> crate::SessionScope {
174        if self.dispatch.agent_frame_id.is_empty() {
175            crate::SessionScope::new(self.session_id.clone())
176        } else {
177            crate::SessionScope::for_agent_frame(
178                self.session_id.clone(),
179                self.dispatch.agent_frame_id.clone(),
180            )
181        }
182    }
183
184    pub fn trigger_store(&self) -> Option<Arc<dyn crate::TriggerStore>> {
185        self.dispatch
186            .trigger_router
187            .as_ref()
188            .map(crate::TriggerRouter::store)
189    }
190
191    pub fn trigger_registration_originator(&self) -> crate::ProcessOriginator {
192        self.process_originator
193            .clone()
194            .unwrap_or_else(|| crate::ProcessOriginator::session(self.session_scope()))
195    }
196
197    pub fn trigger_registration_wake_target(&self) -> Option<crate::SessionScope> {
198        self.process_wake_target
199            .clone()
200            .or_else(|| Some(self.session_scope()))
201    }
202
203    pub fn attachment_store(&self) -> Arc<dyn crate::AttachmentStore> {
204        Arc::clone(&self.attachment_store)
205    }
206
207    pub fn process_env_store(&self) -> Arc<dyn crate::ProcessExecutionEnvStore> {
208        Arc::clone(&self.process_env_store)
209    }
210
211    pub fn chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
212        Arc::clone(&self.chronological_projection)
213    }
214
215    pub fn protocol_extension<T: 'static>(&self) -> Option<&T> {
216        self.protocol_extension
217            .as_ref()
218            .and_then(|extension| extension.as_any().downcast_ref::<T>())
219    }
220
221    pub fn turn_context(&self) -> &crate::TurnContext {
222        &self.turn_context
223    }
224
225    pub fn tool_catalog(&self) -> Arc<crate::ToolCatalog> {
226        Arc::clone(&self.dispatch.tool_catalog)
227    }
228
229    pub(crate) fn session_graph_service(&self) -> &dyn crate::plugin::SessionGraphService {
230        self.dispatch.session_graph.as_ref()
231    }
232
233    pub(super) async fn emit_turn_activity(
234        &self,
235        correlation_id: TurnActivityId,
236        event: TurnEvent,
237    ) {
238        if let Some(tx) = &self.turn_event_tx {
239            let _ = tx.send(TurnActivity::new(correlation_id, event)).await;
240        }
241    }
242
243    pub(crate) fn with_turn_event_sender(mut self, turn_event_tx: Sender<TurnActivity>) -> Self {
244        self.turn_event_tx = Some(turn_event_tx);
245        self
246    }
247
248    pub(crate) fn with_tracing(mut self, tracing: Option<RuntimeExecutionTracing>) -> Self {
249        self.tracing = tracing;
250        self
251    }
252
253    pub(crate) fn with_code_block_graph_key(mut self, graph_key: Option<String>) -> Self {
254        self.code_block_graph_key = graph_key;
255        self
256    }
257
258    pub(crate) fn with_batch_parent_call_id(mut self, parent_call_id: Option<String>) -> Self {
259        self.batch_parent_call_id = parent_call_id;
260        self
261    }
262
263    /// Graph key of the enclosing code block for tool calls run from this
264    /// context, or `None` when no code block is executing.
265    pub(super) fn code_block_graph_key(&self) -> Option<String> {
266        self.code_block_graph_key.clone()
267    }
268
269    /// Parent batch call id for tool calls run from this context, or `None`
270    /// when this context is not executing batch children.
271    pub(super) fn batch_parent_call_id(&self) -> Option<String> {
272        self.batch_parent_call_id.clone()
273    }
274
275    /// Emit a `ToolCallStarted` trace event for a tool run from this context.
276    /// No-op when the host installed no trace sink.
277    pub(super) fn emit_tool_call_started_trace(
278        &self,
279        call_id: &str,
280        name: &str,
281        args: &serde_json::Value,
282    ) {
283        if let Some(tracing) = self.tracing.as_ref() {
284            tracing.emit(
285                lash_trace::TraceEvent::ToolCallStarted {
286                    call_id: Some(call_id.to_string()),
287                    name: name.to_string(),
288                    args: args.clone(),
289                },
290                self.dispatch.clock.as_ref(),
291            );
292        }
293    }
294
295    /// Emit a `ToolCallCompleted` trace event for a tool run from this context.
296    /// No-op when the host installed no trace sink.
297    pub(super) fn emit_tool_call_completed_trace(&self, record: &crate::ToolCallRecord) {
298        if let Some(tracing) = self.tracing.as_ref() {
299            tracing.emit(
300                lash_trace::TraceEvent::ToolCallCompleted {
301                    call_id: record.call_id.clone(),
302                    name: record.tool.clone(),
303                    args: record.args.clone(),
304                    output: crate::trace::trace_tool_call_output(&record.output),
305                    duration_ms: record.duration_ms,
306                },
307                self.dispatch.clock.as_ref(),
308            );
309        }
310    }
311
312    pub(crate) fn with_parent_invocation(mut self, metadata: crate::RuntimeInvocation) -> Self {
313        self.parent_invocation = Some(metadata);
314        self
315    }
316
317    pub(crate) fn with_execution_env_spec(
318        mut self,
319        execution_env_spec: crate::ProcessExecutionEnvSpec,
320    ) -> Self {
321        self.execution_env_spec = execution_env_spec;
322        self
323    }
324
325    pub(crate) fn with_process_registration_context(
326        mut self,
327        registration: &crate::ProcessRegistration,
328    ) -> Self {
329        self.process_originator = Some(registration.provenance.originator.clone());
330        self.runtime_process_id = Some(registration.id.clone());
331        self.process_env_ref = registration.env_ref.clone();
332        self.process_wake_target = registration.wake_target.clone();
333        self
334    }
335
336    pub(crate) fn with_process_event_context(
337        mut self,
338        process_id: impl Into<String>,
339        registry: Arc<dyn crate::ProcessRegistry>,
340        awaiter: crate::ProcessAwaiter,
341        store: Option<Arc<dyn crate::RuntimePersistence>>,
342        session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
343        queued_work_driver: Option<crate::QueuedWorkDriver>,
344    ) -> Self {
345        self.process_event_context = Some(RuntimeExecutionProcessEventContext {
346            process_id: process_id.into(),
347            registry,
348            awaiter,
349            store,
350            session_store_factory,
351            queued_work_driver,
352        });
353        self
354    }
355
356    /// Spawn provenance for children started by this context, present only
357    /// when this context executes a process: children inherit the chain's
358    /// originator and wake target instead of the ephemeral execution scope.
359    pub(super) fn record_started_process(&self, process_id: &str) {
360        self.started_process_ids
361            .lock()
362            .expect("started process ids lock")
363            .insert(process_id.to_string());
364    }
365
366    pub(super) fn is_run_local_process(&self, process_id: &str) -> bool {
367        self.started_process_ids
368            .lock()
369            .expect("started process ids lock")
370            .contains(process_id)
371    }
372
373    pub(crate) fn process_spawn_provenance(&self) -> Option<crate::ProcessSpawnProvenance> {
374        self.process_originator
375            .clone()
376            .map(|originator| crate::ProcessSpawnProvenance {
377                originator,
378                wake_target: self.process_wake_target.clone(),
379            })
380    }
381
382    pub(super) async fn attach_captured_process_execution_env(
383        &self,
384        registration: crate::ProcessRegistration,
385    ) -> Result<crate::ProcessRegistration, crate::PluginError> {
386        if registration.env_ref.is_some() {
387            return Ok(registration);
388        }
389        match registration.input.as_ref() {
390            crate::ProcessInput::ToolCall { .. } | crate::ProcessInput::Engine { .. } => {
391                let env_ref = self.captured_process_execution_env_ref().await?;
392                Ok(registration.with_execution_env_ref(Some(env_ref)))
393            }
394            crate::ProcessInput::External { .. } | crate::ProcessInput::SessionTurn { .. } => {
395                Ok(registration)
396            }
397        }
398    }
399
400    pub async fn captured_process_execution_env_ref(
401        &self,
402    ) -> Result<crate::ProcessExecutionEnvRef, crate::PluginError> {
403        if let Some(env_ref) = self.process_env_ref.clone() {
404            return Ok(env_ref);
405        }
406        crate::persist_process_execution_env(
407            self.process_env_store.as_ref(),
408            &self.execution_env_spec,
409        )
410        .await
411    }
412
413    pub(crate) fn with_turn_phase_probe(
414        mut self,
415        probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
416    ) -> Self {
417        self.turn_phase_probe = probe;
418        self
419    }
420
421    #[doc(hidden)]
422    pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
423        crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
424    }
425
426    pub fn parent_invocation(&self) -> Option<&crate::RuntimeInvocation> {
427        self.parent_invocation.as_ref()
428    }
429
430    pub(crate) fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
431        self.cancellation_token = Some(cancellation_token);
432        self
433    }
434
435    pub(crate) fn with_process_work_driver(
436        mut self,
437        process_work_driver: Option<crate::ProcessWorkDriver>,
438    ) -> Self {
439        self.process_work_driver = process_work_driver;
440        self
441    }
442
443    pub(crate) fn tool_scheduling(&self, name: &str) -> crate::ToolScheduling {
444        crate::tool_dispatch::resolve_tool_scheduling(&self.dispatch, name)
445    }
446
447    pub fn callable_tool_manifest(&self, name: &str) -> Option<crate::ToolManifest> {
448        crate::tool_dispatch::resolve_callable_manifest(&self.dispatch, name)
449    }
450
451    pub fn callable_tool_manifest_by_id(&self, id: &crate::ToolId) -> Option<crate::ToolManifest> {
452        crate::tool_dispatch::resolve_callable_manifest_by_id(&self.dispatch, id)
453    }
454
455    pub fn tool_argument_projection_policy(
456        &self,
457        name: &str,
458    ) -> crate::ToolArgumentProjectionPolicy {
459        crate::tool_dispatch::resolve_tool_argument_projection_policy(&self.dispatch, name)
460    }
461
462    pub async fn start_child_process(
463        &self,
464        registration: crate::ProcessRegistration,
465        kind: impl Into<String>,
466        label: Option<String>,
467    ) -> crate::ToolInvocationReply {
468        let _phase = self.named_phase("process.start_child");
469        let registration = match self
470            .attach_captured_process_execution_env(registration)
471            .await
472        {
473            Ok(registration) => registration,
474            Err(err) => {
475                return crate::ToolInvocationReply::error(serde_json::json!(err.to_string()));
476            }
477        };
478        let process_id = registration.id.clone();
479        let mut options = crate::ProcessStartOptions::new()
480            .with_descriptor(crate::ProcessHandleDescriptor::new(Some(kind), label));
481        if let Some(spawn) = self.process_spawn_provenance() {
482            options = options.with_spawn_provenance(spawn);
483        }
484        match self
485            .dispatch
486            .processes
487            .start(
488                &self.session_id,
489                registration,
490                options,
491                self.process_scope(self.parent_invocation.clone()),
492            )
493            .await
494        {
495            Ok(_) => {
496                self.record_started_process(&process_id);
497                crate::ToolInvocationReply::success(Self::process_handle_json(&process_id))
498            }
499            Err(err) => crate::ToolInvocationReply::error(serde_json::json!(err.to_string())),
500        }
501    }
502
503    pub async fn sleep_process(
504        &self,
505        scope: &str,
506        sequence: u64,
507        duration_ms: u64,
508    ) -> Result<(), crate::RuntimeEffectControllerError> {
509        let cancellation = self.cancellation_token.clone().unwrap_or_default();
510        let invocation = crate::runtime::causal::process_sleep_invocation(
511            &self.session_id,
512            self.parent_invocation.as_ref(),
513            scope,
514            sequence,
515        );
516        let outcome = self
517            .dispatch
518            .effect_controller
519            .controller()
520            .execute_effect(
521                crate::RuntimeEffectEnvelope::new(
522                    invocation,
523                    crate::RuntimeEffectCommand::Sleep { duration_ms },
524                ),
525                crate::RuntimeEffectLocalExecutor::sleep_with_clock(
526                    cancellation,
527                    std::sync::Arc::clone(&self.dispatch.clock),
528                ),
529            )
530            .await?;
531        match outcome {
532            crate::RuntimeEffectOutcome::Sleep => Ok(()),
533            other => Err(crate::RuntimeEffectControllerError::new(
534                "runtime_effect_wrong_outcome",
535                format!("expected sleep outcome, got {}", other.kind().as_str()),
536            )),
537        }
538    }
539
540    pub async fn await_process_signal_event(
541        &self,
542        process_id: &str,
543        signal_name: &str,
544        event_ordinal: u64,
545    ) -> Result<serde_json::Value, crate::RuntimeEffectControllerError> {
546        let cancellation = self.cancellation_token.clone().unwrap_or_default();
547        let key = self
548            .dispatch
549            .effect_controller
550            .controller()
551            .await_event_key(
552                &crate::ExecutionScope::process(process_id),
553                crate::AwaitEventWaitIdentity::process_signal(
554                    process_id,
555                    signal_name,
556                    event_ordinal,
557                ),
558            )
559            .await?;
560        let invocation = crate::runtime::causal::process_await_event_invocation(
561            &self.session_id,
562            self.parent_invocation.as_ref(),
563            process_id,
564            signal_name,
565            event_ordinal,
566        );
567        let outcome = self
568            .dispatch
569            .effect_controller
570            .controller()
571            .execute_effect(
572                crate::RuntimeEffectEnvelope::new(
573                    invocation,
574                    crate::RuntimeEffectCommand::AwaitEvent { key },
575                ),
576                crate::RuntimeEffectLocalExecutor::await_event_with_clock(
577                    cancellation,
578                    None,
579                    std::sync::Arc::clone(&self.dispatch.clock),
580                ),
581            )
582            .await?;
583        match outcome.into_await_event()? {
584            crate::Resolution::Ok(value) => Ok(value),
585            crate::Resolution::Err(err) => Err(crate::RuntimeEffectControllerError::new(
586                err.code,
587                err.message,
588            )),
589            crate::Resolution::Timeout => Err(crate::RuntimeEffectControllerError::new(
590                "process_signal_wait_timeout",
591                "process signal wait timed out",
592            )),
593            crate::Resolution::Cancelled => Err(crate::RuntimeEffectControllerError::new(
594                "process_signal_wait_cancelled",
595                "process signal wait was cancelled",
596            )),
597        }
598    }
599
600    pub async fn signal_process_by_id(
601        &self,
602        registry: Arc<dyn crate::ProcessRegistry>,
603        process_id: &str,
604        signal_name: &str,
605        signal_id: String,
606        payload: serde_json::Value,
607    ) -> Result<crate::ProcessEvent, crate::RuntimeEffectControllerError> {
608        let event_type = crate::process_signal_event_type(signal_name)?;
609        let replay_key = format!("process:{process_id}:signal.{signal_name}:{signal_id}");
610        let signal_payload = payload.clone();
611        let command = crate::ProcessCommand::Signal {
612            process_id: process_id.to_string(),
613            signal_name: signal_name.to_string(),
614            signal_id,
615            request: crate::ProcessEventAppendRequest::new(event_type.clone(), payload)
616                .with_replay_key(replay_key),
617        };
618        let effect_id = command.effect_id();
619        let invocation = crate::runtime::causal::process_effect_invocation(
620            &self.session_id,
621            self.parent_invocation.clone(),
622            &effect_id,
623        );
624        let outcome = self
625            .dispatch
626            .effect_controller
627            .controller()
628            .execute_effect(
629                crate::RuntimeEffectEnvelope::new(
630                    invocation,
631                    crate::RuntimeEffectCommand::process(command),
632                ),
633                crate::RuntimeEffectLocalExecutor::processes(
634                    Arc::clone(&registry),
635                    self.process_work_driver.clone(),
636                ),
637            )
638            .await?;
639        match outcome.into_process()? {
640            crate::ProcessEffectOutcome::Signal { event } => {
641                let waiting_ordinal =
642                    registry
643                        .get_process(process_id)
644                        .await
645                        .and_then(|record| match record.wait {
646                            Some(crate::WaitState {
647                                kind:
648                                    crate::WaitKind::Signal {
649                                        name,
650                                        event_type: wait_event_type,
651                                        ordinal,
652                                        ..
653                                    },
654                                ..
655                            }) if name == signal_name && wait_event_type == event_type => {
656                                Some(ordinal)
657                            }
658                            _ => None,
659                        });
660                let ordinal = match waiting_ordinal {
661                    Some(ordinal) => ordinal,
662                    None => {
663                        registry
664                            .count_events_through(process_id, &event_type, event.sequence)
665                            .await?
666                    }
667                };
668                if ordinal > 0 {
669                    let key = self
670                        .dispatch
671                        .effect_controller
672                        .controller()
673                        .await_event_key(
674                            &crate::ExecutionScope::process(process_id),
675                            crate::AwaitEventWaitIdentity::process_signal(
676                                process_id,
677                                signal_name,
678                                ordinal,
679                            ),
680                        )
681                        .await?;
682                    let _ = self
683                        .dispatch
684                        .effect_controller
685                        .controller()
686                        .resolve_await_event(&key, crate::Resolution::Ok(signal_payload))
687                        .await?;
688                }
689                Ok(*event)
690            }
691            other => Err(crate::RuntimeEffectControllerError::new(
692                "runtime_effect_wrong_outcome",
693                format!("expected signal outcome, got {other:?}"),
694            )),
695        }
696    }
697
698    pub async fn append_process_event(
699        &self,
700        registry: Arc<dyn crate::ProcessRegistry>,
701        process_id: &str,
702        request: crate::ProcessEventAppendRequest,
703    ) -> Result<crate::ProcessEvent, crate::PluginError> {
704        let result = registry.append_event(process_id, request).await?;
705        if let Some(context) = self.process_event_context.as_ref() {
706            crate::tool_provider::process_events::enqueue_wake_delivery(
707                context.store.clone(),
708                context.session_store_factory.as_ref(),
709                result.wake_delivery,
710                Some(self.session_graph_service()),
711                context.queued_work_driver.as_ref(),
712            )
713            .await?;
714        }
715        Ok(result.event)
716    }
717}
718
719#[cfg(test)]
720mod tests {
721    use super::*;
722    use crate::tool_dispatch::ToolDispatchContext;
723    use crate::{ToolCall, ToolProvider, ToolResult};
724
725    struct NoopTools;
726
727    #[async_trait::async_trait]
728    impl ToolProvider for NoopTools {
729        fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
730            Vec::new()
731        }
732
733        fn resolve_contract(&self, _name: &str) -> Option<Arc<crate::ToolContract>> {
734            None
735        }
736
737        async fn execute(&self, _call: ToolCall<'_>) -> ToolResult {
738            ToolResult::err_fmt("not used")
739        }
740    }
741
742    #[test]
743    fn tool_argument_projection_policy_resolves_from_active_catalog_and_defaults_unknown() {
744        let tool = crate::ToolDefinition::raw(
745            "tool:seedy",
746            "seedy",
747            "Seed-aware",
748            crate::ToolDefinition::default_input_schema(),
749            serde_json::json!({ "type": "string" }),
750        )
751        .with_argument_projection(
752            crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed"),
753        );
754        let plugins = crate::plugin::PluginHost::empty()
755            .build_session("session", None)
756            .expect("plugin session");
757        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
758        let dispatch = Arc::new(ToolDispatchContext {
759            plugins,
760            tools: Arc::new(NoopTools),
761            tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
762                vec![tool.manifest()],
763                std::collections::BTreeMap::new(),
764            )),
765            sessions: Arc::new(crate::testing::MockSessionManager::default()),
766            session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
767            session_graph: Arc::new(crate::testing::MockSessionManager::default()),
768            processes: Arc::new(crate::UnavailableProcessService),
769            process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
770            trigger_router: None,
771            effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
772                crate::InlineRuntimeEffectController,
773            )),
774            direct_completions: crate::DirectCompletionClient::unavailable(
775                "direct completions are unavailable in this test context",
776            ),
777            parent_invocation: None,
778            execution_env_spec: crate::ProcessExecutionEnvSpec::new(
779                crate::PluginOptions::default(),
780                crate::SessionPolicy::default(),
781            ),
782            session_id: "session".to_string(),
783            agent_frame_id: String::new(),
784            event_tx,
785            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
786            trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
787            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
788            turn_context: crate::TurnContext::default(),
789            clock: std::sync::Arc::new(crate::SystemClock),
790        });
791        let ctx = RuntimeExecutionContext::new(
792            "session".to_string(),
793            dispatch,
794            Arc::new(crate::InMemoryProcessExecutionEnvStore::new()),
795            Arc::new(crate::InMemoryAttachmentStore::new()),
796            Arc::new(crate::ChronologicalProjection::default()),
797            None,
798            crate::TurnContext::default(),
799        );
800
801        assert_eq!(
802            ctx.tool_argument_projection_policy("seedy"),
803            crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed")
804        );
805        assert_eq!(
806            ctx.tool_argument_projection_policy("missing"),
807            crate::ToolArgumentProjectionPolicy::MaterializeProjectedValues
808        );
809    }
810}