lash-core 0.1.0-alpha.51

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
use super::*;
use std::sync::Arc;

impl RuntimeSessionServices {
    pub(in crate::runtime::session_manager::process_runners) async fn run_process_tool_call(
        &self,
        run: ProcessToolCallRun<'_>,
    ) -> crate::ProcessAwaitOutput {
        let result = self.execute_process_tool_call(run).await;
        match result {
            Ok(output) => crate::ProcessAwaitOutput::from_tool_output(output),
            Err(err) => crate::ProcessAwaitOutput::from_tool_output(
                crate::ToolCallOutput::failure(crate::ToolFailure::runtime(
                    crate::ToolFailureClass::Internal,
                    "process_tool_failed",
                    err.to_string(),
                )),
            ),
        }
    }

    async fn execute_process_tool_call(
        &self,
        run: ProcessToolCallRun<'_>,
    ) -> Result<crate::ToolCallOutput, crate::PluginError> {
        let ProcessToolCallRun {
            registration,
            registry,
            call,
            parent_invocation,
            scoped_effect_controller,
            cancellation,
        } = run;
        let await_parent_invocation = parent_invocation.clone();
        let await_cancellation = cancellation.clone();
        let run_context = ProcessRunContext::builder(self)
            .tool_catalog(
                self.current
                    .plugins
                    .resolved_tool_catalog(&self.current.session_id)?,
            )
            .scoped_effect_controller(scoped_effect_controller)
            .causal_invocation(parent_invocation.clone())
            .dispatch_parent_invocation(parent_invocation)
            .build()?;
        let dispatch = run_context.dispatch();
        let tool_context = crate::ToolContext::from_dispatch(Arc::clone(&dispatch))
            .prepared_call(&call)
            .async_process(registration.id.clone(), cancellation)
            .process_events(
                registration.id.clone(),
                registry,
                self.current.store.clone(),
                self.current.host.session_store_factory.clone(),
                self.current.host.queued_work_poke.clone(),
            )
            .build();
        let launch = Box::pin(
            crate::tool_dispatch::dispatch_prepared_tool_call_launch_with_execution_context(
                dispatch.as_ref(),
                call,
                None,
                tool_context,
            ),
        )
        .await;
        let output = match launch {
            crate::tool_dispatch::ToolCallLaunch::Done(outcome) => outcome.record.output,
            crate::tool_dispatch::ToolCallLaunch::Pending(pending) => {
                let fallback;
                let parent = if let Some(parent) = await_parent_invocation.as_ref() {
                    parent
                } else {
                    fallback = crate::RuntimeInvocation::effect(
                        crate::RuntimeScope::new(&self.current.session_id),
                        format!(
                            "process:{}:tool:{}:await",
                            registration.id, pending.tool_name
                        ),
                        crate::RuntimeEffectKind::AwaitEvent,
                        format!(
                            "process:{}:tool:{}:await",
                            registration.id, pending.tool_name
                        ),
                    );
                    &fallback
                };
                let invocation = crate::runtime::causal::child_effect_invocation(
                    parent,
                    format!(
                        "process:{}:tool:{}:await",
                        registration.id, pending.tool_name
                    ),
                    crate::RuntimeEffectKind::AwaitEvent,
                    format!(
                        "process:{}:tool:{}:await",
                        registration.id, pending.tool_name
                    ),
                );
                let outcome = Box::pin(
                    dispatch.effect_controller.controller().execute_effect(
                        crate::RuntimeEffectEnvelope::new(
                            invocation,
                            crate::RuntimeEffectCommand::AwaitEvent { key: pending.key },
                        ),
                        crate::RuntimeEffectLocalExecutor::await_event(
                            await_cancellation.clone(),
                            pending
                                .pending
                                .deadline
                                .map(|duration| std::time::Instant::now() + duration),
                        ),
                    ),
                )
                .await
                .map_err(|err| crate::PluginError::Session(err.to_string()))?;
                let resolution = outcome
                    .into_await_event()
                    .map_err(|err| crate::PluginError::Session(err.to_string()))?;
                crate::tool_result::tool_output_from_completion_resolution(resolution)
            }
        };
        drop(dispatch);
        run_context.shutdown().await;
        Ok(output)
    }
}