lash-core 0.1.0-alpha.37

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
use super::*;

impl RuntimeTurnDriver<'_> {
    pub(super) async fn handle_llm_call_effect(
        &mut self,
        machine: &mut TurnMachine,
        id: crate::sansio::EffectId,
        request: Arc<LlmRequest>,
        event_tx: &mpsc::Sender<RuntimeStreamEvent>,
        cancel: &CancellationToken,
    ) -> Result<(), RuntimeError> {
        if cancel.is_cancelled() {
            send_session_event(event_tx, SessionEvent::Done).await;
            machine.finish_with_outcome(crate::TurnOutcome::Stopped(TurnStop::Cancelled));
            return Ok(());
        }
        match self.before_llm_call(machine, &request).await {
            Ok(Some(crate::ProtocolLlmCallAction::SwitchAgentFrame { frame_id })) => {
                machine.finish_with_outcome(crate::TurnOutcome::AgentFrameSwitch { frame_id });
                return Ok(());
            }
            Ok(None) => {}
            Err(err) => {
                let err_string = err.to_string();
                if self.should_abort_for_runtime_effect_error() {
                    return Err(RuntimeError::new(
                        RuntimeErrorCode::ProtocolBeforeLlmCall,
                        err_string,
                    ));
                }
                machine.fail_turn(make_error_event(
                    "protocol_before_llm_call",
                    Some("before_llm_call_failed"),
                    err_string.clone(),
                    Some(err_string),
                ));
                return Ok(());
            }
        }
        let (result, text_streamed) = match self
            .invoke_turn_llm_effect(machine, id, request, event_tx, cancel)
            .await
        {
            Ok(result) => result,
            Err(err) => {
                self.fail_or_abort_runtime_effect_controller(machine, err)?;
                return Ok(());
            }
        };
        if let Ok(response) = &result {
            let usage = crate::runtime::effect::token_usage_from_llm(&response.usage);
            self.turn_pipeline.state_mut().last_prompt_usage =
                normalize_prompt_usage(self.policy.provider(), &usage);
            if !text_streamed {
                let prose_projector = self.session.plugins().assistant_prose_projector();
                emit_semantic_response_parts(event_tx, response, prose_projector.as_deref()).await;
            }
        }
        machine.handle_response(Response::LlmComplete {
            id,
            result,
            text_streamed,
        });
        Ok(())
    }

    pub(super) async fn handle_checkpoint_effect(
        &mut self,
        machine: &mut TurnMachine,
        id: crate::sansio::EffectId,
        checkpoint: CheckpointKind,
        event_tx: &mpsc::Sender<RuntimeStreamEvent>,
        cancel: &CancellationToken,
    ) -> Result<(), RuntimeError> {
        let result = self
            .invoke_turn_checkpoint_effect(machine, id, checkpoint, event_tx, cancel)
            .await;
        match result {
            Ok(delivery) => {
                machine.handle_response(Response::Checkpoint { id, delivery });
            }
            Err(err) => {
                self.fail_or_abort_runtime_effect_controller(machine, err.into())?;
            }
        }
        Ok(())
    }

    pub(super) async fn handle_execution_surface_sync_effect(
        &mut self,
        machine: &mut TurnMachine,
        id: crate::sansio::EffectId,
        update_machine_config: bool,
        event_tx: &mpsc::Sender<RuntimeStreamEvent>,
        cancel: &CancellationToken,
    ) -> Result<(), RuntimeError> {
        let result = match self
            .invoke_turn_execution_surface_sync_effect(
                machine,
                id,
                update_machine_config,
                event_tx,
                cancel,
            )
            .await
        {
            Ok(result) => result,
            Err(err) => {
                self.fail_or_abort_runtime_effect_controller(machine, err)?;
                return Ok(());
            }
        };
        machine.handle_response(Response::ExecutionSurfaceSynced { id, result });
        Ok(())
    }

    pub(super) async fn handle_tool_calls_effect(
        &mut self,
        machine: &mut TurnMachine,
        id: crate::sansio::EffectId,
        calls: Vec<crate::sansio::PendingToolCall>,
        event_tx: &mpsc::Sender<RuntimeStreamEvent>,
        cancel: &CancellationToken,
    ) -> Result<(), RuntimeError> {
        if self.host.core.tracing.trace_sink.is_some() {
            for pending in &calls {
                self.emit_tool_call_started_trace(
                    machine.protocol_iteration(),
                    Some(pending.call_id.clone()),
                    pending.tool_name.clone(),
                    pending.args.clone(),
                );
            }
        }
        let results = match self
            .invoke_turn_tool_calls_effect(machine, id, calls, event_tx, cancel)
            .await
        {
            Ok(results) => results,
            Err(err) => {
                self.fail_or_abort_runtime_effect_controller(machine, err)?;
                return Ok(());
            }
        };
        if self.host.core.tracing.trace_sink.is_some() {
            for outcome in &results {
                let record = ToolCallRecord {
                    call_id: Some(outcome.call_id.clone()),
                    tool: outcome.tool_name.clone(),
                    args: outcome.args.clone(),
                    output: outcome.output.clone(),
                    duration_ms: outcome.duration_ms,
                };
                self.emit_tool_call_trace(machine.protocol_iteration(), &record);
            }
        }
        machine.handle_response(Response::ToolResults { id, results });
        Ok(())
    }

    pub(super) async fn handle_exec_code_effect(
        &mut self,
        machine: &mut TurnMachine,
        id: crate::sansio::EffectId,
        code: String,
        event_tx: &mpsc::Sender<RuntimeStreamEvent>,
        cancel: &CancellationToken,
    ) -> Result<(), RuntimeError> {
        let code_correlation_id = TurnActivityId::new(format!("code:{id:?}"));
        let iteration = machine.protocol_iteration();
        if self.host.core.tracing.trace_sink.is_some() {
            self.emit_protocol_diagnostic_trace(
                iteration,
                "exec_code_started",
                serde_json::json!({
                    "code": code,
                    "code_chars": code.chars().count(),
                }),
            );
        }
        let invocation = match self.turn_effect_invocation(machine, id, RuntimeEffectKind::ExecCode)
        {
            Ok(invocation) => invocation,
            Err(err) => {
                let message = err.to_string();
                send_turn_activity(
                    event_tx,
                    code_correlation_id.clone(),
                    TurnEvent::CodeBlockStarted {
                        language: "lashlang".to_string(),
                        code: code.clone(),
                        graph_key: None,
                    },
                )
                .await;
                send_turn_activity(
                    event_tx,
                    code_correlation_id.clone(),
                    TurnEvent::CodeBlockCompleted {
                        language: "lashlang".to_string(),
                        output: String::new(),
                        error: Some(message),
                        success: false,
                        duration_ms: 0,
                        tool_call_ids: Vec::new(),
                        graph_key: None,
                    },
                )
                .await;
                self.fail_or_abort_runtime_effect_controller(machine, err)?;
                return Ok(());
            }
        };
        let graph_key = foreground_exec_graph_key(&invocation);
        send_turn_activity(
            event_tx,
            code_correlation_id.clone(),
            TurnEvent::CodeBlockStarted {
                language: "lashlang".to_string(),
                code: code.clone(),
                graph_key: graph_key.clone(),
            },
        )
        .await;
        let exec_created_at = std::time::Instant::now();
        let result = match self
            .invoke_turn_exec_effect(machine, invocation, code.clone(), event_tx, cancel)
            .await
        {
            Ok(result) => result,
            Err(err) => {
                let message = err.to_string();
                send_turn_activity(
                    event_tx,
                    code_correlation_id.clone(),
                    TurnEvent::CodeBlockCompleted {
                        language: "lashlang".to_string(),
                        output: String::new(),
                        error: Some(message),
                        success: false,
                        duration_ms: exec_created_at.elapsed().as_millis() as u64,
                        tool_call_ids: Vec::new(),
                        graph_key: graph_key.clone(),
                    },
                )
                .await;
                self.fail_or_abort_runtime_effect_controller(machine, err)?;
                return Ok(());
            }
        };
        match &result {
            Ok(output) => {
                send_turn_activity(
                    event_tx,
                    code_correlation_id.clone(),
                    TurnEvent::CodeBlockCompleted {
                        language: "lashlang".to_string(),
                        output: output.observations.join("\n"),
                        error: output.error.clone(),
                        success: output.error.is_none(),
                        duration_ms: output.duration_ms,
                        tool_call_ids: output
                            .tool_calls
                            .iter()
                            .filter_map(|record| record.call_id.clone())
                            .collect(),
                        graph_key: graph_key.clone(),
                    },
                )
                .await;
            }
            Err(error) => {
                send_turn_activity(
                    event_tx,
                    code_correlation_id.clone(),
                    TurnEvent::CodeBlockCompleted {
                        language: "lashlang".to_string(),
                        output: String::new(),
                        error: Some(error.clone()),
                        success: false,
                        duration_ms: exec_created_at.elapsed().as_millis() as u64,
                        tool_call_ids: Vec::new(),
                        graph_key: graph_key.clone(),
                    },
                )
                .await;
            }
        }
        if let Ok(output) = &result {
            if self.host.core.tracing.trace_sink.is_some() {
                let observations_text = output.observations.join("\n");
                self.emit_protocol_diagnostic_trace(
                    iteration,
                    "exec_code_completed",
                    serde_json::json!({
                        "duration_ms": output.duration_ms,
                        "output": observations_text,
                        "output_chars": observations_text.chars().count(),
                        "observation_count": output.observations.len(),
                        "observation_truncation": output.observation_truncation,
                        "error": output.error,
                        "terminal_finish": output.terminal_finish,
                        "terminal_finish_present": output.terminal_finish.is_some(),
                        "tool_call_count": output.tool_calls.len(),
                    }),
                );
                if !output.observation_truncation.is_empty() {
                    self.emit_protocol_diagnostic_trace(
                        iteration,
                        "observation_projection",
                        serde_json::json!({
                            "projections": output.observation_truncation,
                        }),
                    );
                }
            }
        } else if let Err(error) = &result
            && self.host.core.tracing.trace_sink.is_some()
        {
            self.emit_protocol_diagnostic_trace(
                iteration,
                "exec_code_failed",
                serde_json::json!({ "error": error }),
            );
        }
        machine.handle_response(match result {
            Ok(output) => Response::ExecResult {
                id,
                result: Ok(output),
            },
            Err(error) => Response::ExecResult {
                id,
                result: Err(error),
            },
        });
        Ok(())
    }
}

fn foreground_exec_graph_key(invocation: &RuntimeInvocation) -> Option<String> {
    let RuntimeSubject::Effect { effect_id, kind } = &invocation.subject else {
        return None;
    };
    if *kind != RuntimeEffectKind::ExecCode {
        return None;
    }
    Some(match invocation.scope.turn_id.as_deref() {
        Some(turn_id) if !turn_id.is_empty() => {
            format!(
                "effect:{}:{turn_id}:{effect_id}",
                invocation.scope.session_id
            )
        }
        _ => format!("effect:{}:{effect_id}", invocation.scope.session_id),
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn foreground_exec_graph_key_uses_runtime_invocation_identity() {
        let invocation = RuntimeInvocation::effect(
            RuntimeScope::for_turn("session-1", "turn-1", 2, 3),
            "effect-7",
            RuntimeEffectKind::ExecCode,
            "replay-key",
        );

        assert_eq!(
            foreground_exec_graph_key(&invocation).as_deref(),
            Some("effect:session-1:turn-1:effect-7")
        );
    }
}