lash-core 0.1.0-alpha.60

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
use super::*;

pub(in crate::runtime) struct ToolBatchRunOutcome {
    pub launches: Vec<crate::runtime::ToolCallLaunch>,
    pub triggers: Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>,
}

impl RuntimeTurnDriver<'_> {
    pub(super) async fn invoke_turn_tool_calls_effect(
        &mut self,
        machine: &mut TurnMachine,
        id: crate::sansio::EffectId,
        calls: Vec<crate::sansio::PendingToolCall>,
        event_tx: &mpsc::Sender<RuntimeStreamEvent>,
        cancel: &CancellationToken,
    ) -> Result<Vec<crate::sansio::CompletedToolCall>, RuntimeEffectControllerError> {
        let (tool_event_tx, mut tool_event_rx) = tokio::sync::mpsc::channel::<SessionEvent>(64);
        let runtime_event_tx = event_tx.clone();
        let tool_event_forwarder = tokio::spawn(async move {
            while let Some(event) = tool_event_rx.recv().await {
                send_session_event(&runtime_event_tx, event).await;
            }
        });
        let prepare_context = self
            .execution_context(
                tool_event_tx.clone(),
                Arc::new(crate::ChronologicalProjection::default()),
            )
            .map_err(|err| {
                RuntimeEffectControllerError::new("tool_catalog_resolution_failed", err.to_string())
            })?;
        let call_count = calls.len();
        let mut results = vec![None; call_count];
        let mut prepared_entries = Vec::new();
        for (index, call) in calls.into_iter().enumerate() {
            let call_id = call.call_id.clone();
            let replay = call.replay.clone();
            match prepare_context.prepare_tool_call(call).await {
                crate::tool_dispatch::ToolPreparationOutcome::Prepared(prepared) => {
                    prepared_entries.push((index, prepared));
                }
                crate::tool_dispatch::ToolPreparationOutcome::Completed(outcome) => {
                    let completed = prepare_context
                        .complete_tool_call(
                            index,
                            call_id.clone(),
                            replay,
                            *outcome,
                            crate::TurnActivityId::new(format!("tool:{call_id}")),
                        )
                        .await
                        .completed;
                    results[index] = Some(completed);
                }
            }
        }

        if !prepared_entries.is_empty() {
            let parent_invocation =
                self.turn_effect_invocation(machine, id, RuntimeEffectKind::ToolBatch)?;
            let batch = crate::PreparedToolBatch::new(
                id.0.to_string(),
                prepared_entries
                    .iter()
                    .map(|(_, prepared)| prepared.clone())
                    .collect(),
            );
            let outcome = self
                .execute_typed_turn_effect(
                    machine,
                    event_tx,
                    cancel,
                    RuntimeEffectEnvelope::new(
                        parent_invocation,
                        RuntimeEffectCommand::ToolBatch { batch },
                    ),
                    RuntimeEffectOutcome::into_tool_batch_effect,
                )
                .await?;
            if outcome.launches.len() != prepared_entries.len() {
                return Err(RuntimeEffectControllerError::new(
                    "tool_batch_result_count_mismatch",
                    format!(
                        "tool batch returned {} launches for {} prepared calls",
                        outcome.launches.len(),
                        prepared_entries.len()
                    ),
                ));
            }
            for ((source_index, prepared), launch) in
                prepared_entries.into_iter().zip(outcome.launches)
            {
                let call_id = prepared.call_id.clone();
                let replay = prepared.replay.clone();
                match launch {
                    crate::runtime::ToolCallLaunch::Done { result } => {
                        results[source_index] = Some(result);
                    }
                    crate::runtime::ToolCallLaunch::Pending {
                        key,
                        pending,
                        duration_ms,
                    } => {
                        let resolution = self
                            .await_pending_tool_completion(
                                machine, id, &call_id, key, &pending, event_tx, cancel,
                            )
                            .await?;
                        let dispatch_outcome = prepare_context
                            .pending_completion_dispatch_outcome(
                                prepared.tool_name.clone(),
                                prepared.args.clone(),
                                resolution,
                                duration_ms,
                            )
                            .await;
                        let completed = prepare_context
                            .complete_tool_call(
                                source_index,
                                call_id.clone(),
                                replay,
                                dispatch_outcome,
                                crate::TurnActivityId::new(format!("tool:{call_id}")),
                            )
                            .await
                            .completed;
                        send_turn_activity(
                            event_tx,
                            crate::TurnActivityId::new(format!("tool:{call_id}")),
                            crate::TurnEvent::ToolCallCompleted {
                                call_id: Some(call_id.clone()),
                                name: completed.tool_name.clone(),
                                args: completed.args.clone(),
                                output: completed.output.clone(),
                                duration_ms: completed.duration_ms,
                            },
                        )
                        .await;
                        results[source_index] = Some(completed);
                    }
                }
            }
        }
        drop(prepare_context);
        drop(tool_event_tx);
        let _ = tool_event_forwarder.await;
        results
            .into_iter()
            .enumerate()
            .map(|(index, result)| {
                result.ok_or_else(|| {
                    RuntimeEffectControllerError::new(
                        "tool_batch_missing_result",
                        format!("tool batch did not fill result slot {index}"),
                    )
                })
            })
            .collect()
    }

    pub(in crate::runtime) async fn run_tool_calls(
        &mut self,
        pending_tools: Vec<(crate::PreparedToolCall, crate::RuntimeInvocation)>,
        event_tx: &mpsc::Sender<RuntimeStreamEvent>,
        cancel: &CancellationToken,
    ) -> Result<ToolBatchRunOutcome, crate::RuntimeEffectControllerError> {
        let mut launches = Vec::with_capacity(pending_tools.len());
        let mut triggers = Vec::new();
        for (prepared, invocation) in pending_tools {
            let batch = crate::PreparedToolBatch::new(prepared.call_id.clone(), vec![prepared]);
            let mut outcome = self
                .run_tool_batch(batch, invocation, event_tx, cancel)
                .await?;
            launches.append(&mut outcome.launches);
            triggers.append(&mut outcome.triggers);
        }
        Ok(ToolBatchRunOutcome { launches, triggers })
    }

    pub(in crate::runtime) async fn run_tool_batch(
        &mut self,
        batch: crate::PreparedToolBatch,
        invocation: crate::RuntimeInvocation,
        event_tx: &mpsc::Sender<RuntimeStreamEvent>,
        cancel: &CancellationToken,
    ) -> Result<ToolBatchRunOutcome, crate::RuntimeEffectControllerError> {
        let (tool_event_tx, mut tool_event_rx) = tokio::sync::mpsc::channel::<SessionEvent>(64);
        let (turn_event_tx, mut turn_event_rx) = tokio::sync::mpsc::channel::<TurnActivity>(64);
        let runtime_event_tx = event_tx.clone();
        let tool_event_forwarder = tokio::spawn(async move {
            while let Some(event) = tool_event_rx.recv().await {
                send_session_event(&runtime_event_tx, event).await;
            }
        });
        let runtime_event_tx = event_tx.clone();
        let turn_event_forwarder = tokio::spawn(async move {
            while let Some(event) = turn_event_rx.recv().await {
                let _ = runtime_event_tx.send(RuntimeStreamEvent::Turn(event)).await;
            }
        });
        let context = match self.execution_context(
            tool_event_tx.clone(),
            Arc::new(crate::ChronologicalProjection::default()),
        ) {
            Ok(context) => context
                .with_turn_event_sender(turn_event_tx.clone())
                .with_cancellation_token(cancel.clone()),
            Err(err) => {
                drop(tool_event_tx);
                drop(turn_event_tx);
                let _ = tool_event_forwarder.await;
                let _ = turn_event_forwarder.await;
                return Err(crate::RuntimeEffectControllerError::new(
                    "tool_catalog_resolution_failed",
                    err.to_string(),
                ));
            }
        };
        let outcome = context
            .execute_prepared_tool_batch_launches(
                batch,
                invocation,
                std::collections::HashMap::new(),
            )
            .await?;
        drop(context);
        drop(tool_event_tx);
        drop(turn_event_tx);
        let _ = tool_event_forwarder.await;
        let _ = turn_event_forwarder.await;
        Ok(ToolBatchRunOutcome {
            launches: outcome.launches,
            triggers: outcome.triggers,
        })
    }

    async fn await_pending_tool_completion(
        &mut self,
        machine: &mut TurnMachine,
        parent_effect_id: crate::sansio::EffectId,
        call_id: &str,
        key: crate::AwaitEventKey,
        _pending: &crate::PendingCompletion,
        event_tx: &mpsc::Sender<RuntimeStreamEvent>,
        cancel: &CancellationToken,
    ) -> Result<crate::Resolution, RuntimeEffectControllerError> {
        let parent =
            self.turn_effect_invocation(machine, parent_effect_id, RuntimeEffectKind::ToolBatch)?;
        let invocation = crate::runtime::causal::child_effect_invocation(
            &parent,
            format!("{}:{call_id}:await", parent_effect_id.0),
            RuntimeEffectKind::AwaitEvent,
            format!("{call_id}:await"),
        );
        let _ = event_tx;
        let scoped_effect_controller = self.scoped_effect_controller.clone();
        let deadline = _pending
            .deadline
            .map(|duration| self.host.core.clock.now() + duration);
        let outcome = scoped_effect_controller
            .controller()
            .execute_effect(
                RuntimeEffectEnvelope::new(invocation, RuntimeEffectCommand::AwaitEvent { key }),
                crate::RuntimeEffectLocalExecutor::await_event_with_clock(
                    cancel.clone(),
                    deadline,
                    Arc::clone(&self.host.core.clock),
                ),
            )
            .await?;
        RuntimeEffectOutcome::into_await_event(outcome)
    }
}