lash-core 0.1.0-alpha.37

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
use super::*;

impl RuntimeTurnDriver<'_> {
    pub(super) async fn invoke_turn_checkpoint_effect(
        &mut self,
        machine: &mut TurnMachine,
        id: crate::sansio::EffectId,
        checkpoint: CheckpointKind,
        event_tx: &mpsc::Sender<RuntimeStreamEvent>,
        cancel: &CancellationToken,
    ) -> Result<crate::CheckpointDelivery, RuntimeError> {
        let invocation = self
            .turn_effect_invocation(machine, id, RuntimeEffectKind::Checkpoint)
            .map_err(RuntimeEffectControllerError::into_runtime_error)?;
        self.execute_typed_turn_effect(
            machine,
            event_tx,
            cancel,
            RuntimeEffectEnvelope::new(invocation, RuntimeEffectCommand::Checkpoint { checkpoint }),
            RuntimeEffectOutcome::into_checkpoint,
        )
        .await
        .and_then(|result| result)
        .map_err(RuntimeEffectControllerError::into_runtime_error)
    }

    pub(super) async fn invoke_turn_execution_surface_sync_effect(
        &mut self,
        machine: &mut TurnMachine,
        id: crate::sansio::EffectId,
        update_machine_config: bool,
        event_tx: &mpsc::Sender<RuntimeStreamEvent>,
        cancel: &CancellationToken,
    ) -> Result<
        Result<Option<crate::sansio::ExecutionSurfaceSync>, String>,
        RuntimeEffectControllerError,
    > {
        let invocation =
            self.turn_effect_invocation(machine, id, RuntimeEffectKind::SyncExecutionSurface)?;
        self.execute_typed_turn_effect(
            machine,
            event_tx,
            cancel,
            RuntimeEffectEnvelope::new(
                invocation,
                RuntimeEffectCommand::SyncExecutionSurface {
                    update_machine_config,
                },
            ),
            RuntimeEffectOutcome::into_sync_execution_surface,
        )
        .await
    }

    pub(super) async fn invoke_turn_exec_effect(
        &mut self,
        machine: &mut TurnMachine,
        invocation: crate::RuntimeInvocation,
        code: String,
        event_tx: &mpsc::Sender<RuntimeStreamEvent>,
        cancel: &CancellationToken,
    ) -> Result<Result<crate::ExecResponse, String>, RuntimeEffectControllerError> {
        self.execute_typed_turn_effect(
            machine,
            event_tx,
            cancel,
            RuntimeEffectEnvelope::new(invocation, RuntimeEffectCommand::ExecCode { code }),
            RuntimeEffectOutcome::into_exec_code,
        )
        .await
    }

    pub(in crate::runtime) async fn run_checkpoint(
        &mut self,
        machine: &mut TurnMachine,
        checkpoint: CheckpointKind,
        event_tx: &mpsc::Sender<RuntimeStreamEvent>,
    ) -> Result<crate::CheckpointDelivery, RuntimeError> {
        let mut committed = self.checkpoint_messages.drain().map_err(|err| {
            RuntimeError::new(
                RuntimeErrorCode::Other("checkpoint_messages".to_string()),
                err,
            )
        })?;
        let mut transient_messages = Vec::new();
        let mut turn_causes = Vec::new();
        let queue_claim = if let Some(store) = self.session.history_store() {
            store
                .claim_ready_queued_work(
                    &self.session_id,
                    &self.turn_id,
                    crate::QueuedWorkClaimBoundary::ActiveTurnCheckpoint,
                    crate::QUEUED_WORK_CLAIM_TTL_MS,
                    64,
                )
                .await
                .map_err(|err| {
                    RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
                })?
        } else {
            None
        };
        if let Some(claim) = queue_claim {
            let accepted_turn_inputs = claim.accepted_turn_inputs();
            let materialized = claim
                .materialize_for_checkpoint_with_attachments(
                    self.host.core.durability.attachment_store.as_ref(),
                )
                .map_err(|err| {
                    RuntimeError::new(
                        RuntimeErrorCode::DurableStoreRequired {
                            facet: crate::DurableStoreFacet::AttachmentStore,
                        },
                        err,
                    )
                })?;
            send_queued_work_started_event(
                event_tx,
                crate::QueuedWorkClaimBoundary::ActiveTurnCheckpoint,
                &claim,
                materialized.turn_causes.clone(),
            )
            .await;
            self.emit_trace(
                machine.protocol_iteration(),
                lash_trace::TraceEvent::Custom {
                    name: "queued_work.claimed".to_string(),
                    payload: queued_work_trace_payload(
                        crate::QueuedWorkClaimBoundary::ActiveTurnCheckpoint,
                        &claim,
                        &materialized.turn_causes,
                    ),
                },
            );
            committed.extend(materialized.messages);
            transient_messages.extend(materialized.transient_messages);
            turn_causes.extend(materialized.turn_causes);
            if !accepted_turn_inputs.is_empty() {
                send_session_event(
                    event_tx,
                    SessionEvent::InjectedTurnInputAccepted {
                        inputs: accepted_turn_inputs,
                        checkpoint,
                    },
                )
                .await;
            }
            self.pending_queue_claims.push(claim);
        }
        let plugins = Arc::clone(self.session.plugins());
        let applied = plugins
            .apply_checkpoint(CheckpointHookContext {
                session_id: self.session_id.clone(),
                checkpoint,
                state: self.checkpoint_state_view(
                    machine.message_sequence(),
                    machine.protocol_iteration(),
                ),
                sessions: self.session_services.state_service(),
                session_lifecycle: self.session_services.lifecycle_service(),
                session_graph: self.session_services.graph_service(),
            })
            .await
            .map_err(|err| {
                RuntimeError::new(RuntimeErrorCode::PluginCheckpoint, err.to_string())
            })?;
        committed.extend(applied.messages);
        emit_session_events(event_tx, applied.events).await;
        if let Some(abort) = applied.abort {
            return Err(RuntimeError::new(abort.code, abort.message));
        }

        if !committed.is_empty() {
            send_session_event(
                event_tx,
                SessionEvent::InjectedMessagesCommitted {
                    messages: committed.clone(),
                    checkpoint,
                },
            )
            .await;
        }

        Ok(crate::CheckpointDelivery {
            messages: committed,
            transient_messages,
            turn_causes,
        })
    }

    pub(in crate::runtime) async fn run_exec_code(
        &mut self,
        code: &str,
        messages: crate::MessageSequence,
        protocol_iteration: usize,
        invocation: crate::RuntimeInvocation,
        event_tx: &mpsc::Sender<RuntimeStreamEvent>,
    ) -> Result<crate::ExecResponse, String> {
        let (session_event_tx, mut session_event_rx) = mpsc::channel::<SessionEvent>(100);
        let (turn_event_tx, mut turn_event_rx) = mpsc::channel::<TurnActivity>(100);
        let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::<SandboxMessage>();
        self.session.set_message_sender(msg_tx);
        let relay_tx = event_tx.clone();
        let relay_handle = tokio::spawn(async move {
            let mut sandbox_closed = false;
            let mut session_closed = false;
            let mut turn_closed = false;
            while !(sandbox_closed && session_closed && turn_closed) {
                tokio::select! {
                    biased;
                    maybe_sandbox = msg_rx.recv(), if !sandbox_closed => {
                        let Some(sandbox_msg) = maybe_sandbox else {
                            sandbox_closed = true;
                            continue;
                        };
                        if sandbox_msg.kind != "lashlang_code" && !relay_tx.is_closed() {
                            let _ = relay_tx
                                .send(RuntimeStreamEvent::Session(SessionEvent::Message {
                                    text: sandbox_msg.text,
                                    kind: sandbox_msg.kind,
                                }))
                                .await;
                        }
                    }
                    maybe_event = session_event_rx.recv(), if !session_closed => {
                        let Some(event) = maybe_event else {
                            session_closed = true;
                            continue;
                        };
                        send_session_event(&relay_tx, event).await;
                    }
                    maybe_turn_event = turn_event_rx.recv(), if !turn_closed => {
                        let Some(event) = maybe_turn_event else {
                            turn_closed = true;
                            continue;
                        };
                        let _ = relay_tx.send(RuntimeStreamEvent::Turn(event)).await;
                    }
                }
            }
        });
        let code_executor = self.session.plugins().code_executor();
        let read_view = self.checkpoint_state_view(messages, protocol_iteration);
        let chronological_projection = read_view.shared_chronological_projection();
        let context = self
            .execution_context(session_event_tx.clone(), chronological_projection)
            .map_err(|err| err.to_string())?
            .with_turn_event_sender(turn_event_tx.clone())
            .with_lashlang_execution_trace(
                self.host.core.tracing.lashlang_execution_sink.clone(),
                self.host.core.tracing.trace_context.clone(),
            );
        let context = context.with_parent_invocation(invocation);
        let result = match code_executor {
            Some(code_executor) => code_executor
                .execute_code(
                    context,
                    crate::ExecRequest {
                        code: code.to_string(),
                        accept_finish: true,
                    },
                )
                .await
                .map_err(|e| e.to_string()),
            None => {
                drop(context);
                Err(crate::SessionError::CodeExecutionUnavailable.to_string())
            }
        };
        drop(session_event_tx);
        drop(turn_event_tx);
        self.session.clear_message_sender();
        let _ = relay_handle.await;
        result
    }
}