lash-core 0.1.0-alpha.37

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
use super::*;

impl RuntimeTurnDriver<'_> {
    pub(in crate::runtime) async fn run(
        &mut self,
        messages: crate::MessageSequence,
        event_tx: mpsc::Sender<RuntimeStreamEvent>,
        cancel: CancellationToken,
        run_offset: usize,
    ) -> Result<(crate::MessageSequence, usize), RuntimeError> {
        let machine = match self
            .prepare_turn_machine(messages, &event_tx, run_offset)
            .await
        {
            Ok(prepared) => prepared,
            Err(result) => return Ok(result),
        };
        self.run_machine(machine, event_tx, cancel, run_offset)
            .await
    }

    async fn run_machine(
        &mut self,
        mut machine: TurnMachine,
        event_tx: mpsc::Sender<RuntimeStreamEvent>,
        cancel: CancellationToken,
        run_offset: usize,
    ) -> Result<(crate::MessageSequence, usize), RuntimeError> {
        macro_rules! emit {
            ($event:expr) => {
                send_session_event(&event_tx, $event).await
            };
        }
        loop {
            let Some(effect) = machine.poll_effect() else {
                break;
            };
            match effect {
                Effect::Emit(event) => {
                    if let SessionEvent::TokenUsage {
                        usage, cumulative, ..
                    } = &event
                    {
                        self.turn_pipeline.state_mut().token_usage = cumulative.clone();
                        self.turn_pipeline.state_mut().last_prompt_usage =
                            normalize_prompt_usage(self.policy.provider(), usage);
                    }
                    emit!(event)
                }
                Effect::Progress {
                    messages,
                    event_delta,
                    protocol_iteration,
                } => {
                    self.persist_progress_boundary(messages, event_delta, protocol_iteration)
                        .await?
                }
                Effect::Done {
                    messages,
                    event_delta,
                    protocol_iteration,
                } => {
                    self.turn_pipeline.apply_event_delta(event_delta);
                    return Ok((messages, protocol_iteration));
                }
                Effect::LlmCall { id, request } => {
                    self.handle_llm_call_effect(&mut machine, id, request, &event_tx, &cancel)
                        .await?;
                }
                Effect::Checkpoint { id, checkpoint } => {
                    self.handle_checkpoint_effect(&mut machine, id, checkpoint, &event_tx, &cancel)
                        .await?;
                }
                Effect::SyncExecutionSurface {
                    id,
                    update_machine_config,
                } => {
                    self.handle_execution_surface_sync_effect(
                        &mut machine,
                        id,
                        update_machine_config,
                        &event_tx,
                        &cancel,
                    )
                    .await?;
                }
                Effect::ToolCalls { id, calls } => {
                    self.handle_tool_calls_effect(&mut machine, id, calls, &event_tx, &cancel)
                        .await?;
                }
                Effect::Log { event } => self.handle_log_event(event),
                Effect::CancelLlm { .. } => {}
                Effect::ExecCode { id, code } => {
                    self.handle_exec_code_effect(&mut machine, id, code, &event_tx, &cancel)
                        .await?;
                }
            }
        }

        Ok((crate::MessageSequence::default(), run_offset))
    }

    async fn persist_progress_boundary(
        &mut self,
        messages: crate::MessageSequence,
        event_delta: Vec<crate::SessionEventRecord>,
        protocol_iteration: usize,
    ) -> Result<(), RuntimeError> {
        if !crate::messages_are_prompt_resume_safe(messages.iter()) {
            return Ok(());
        }
        let boundary = self
            .turn_pipeline
            .progress_boundary(
                &mut self.session,
                self.policy.policy.clone(),
                self.turn_index,
                messages,
                event_delta,
            )
            .await;
        if boundary.persisted {
            for event in &boundary.protocol_events {
                self.emit_trace(protocol_iteration, protocol_step_trace_event(event));
            }
        }
        Ok(())
    }
}