use super::*;
impl RuntimeTurnDriver<'_> {
pub(super) async fn invoke_turn_checkpoint_effect(
&mut self,
machine: &mut TurnMachine,
id: crate::sansio::EffectId,
checkpoint: CheckpointKind,
event_tx: &mpsc::Sender<RuntimeStreamEvent>,
cancel: &CancellationToken,
) -> Result<crate::CheckpointDelivery, RuntimeError> {
let invocation = self
.turn_effect_invocation(machine, id, RuntimeEffectKind::Checkpoint)
.map_err(RuntimeEffectControllerError::into_runtime_error)?;
self.execute_typed_turn_effect(
machine,
event_tx,
cancel,
RuntimeEffectEnvelope::new(invocation, RuntimeEffectCommand::Checkpoint { checkpoint }),
RuntimeEffectOutcome::into_checkpoint,
)
.await
.and_then(|result| result)
.map_err(RuntimeEffectControllerError::into_runtime_error)
}
pub(super) async fn invoke_turn_execution_surface_sync_effect(
&mut self,
machine: &mut TurnMachine,
id: crate::sansio::EffectId,
update_machine_config: bool,
event_tx: &mpsc::Sender<RuntimeStreamEvent>,
cancel: &CancellationToken,
) -> Result<
Result<Option<crate::sansio::ExecutionSurfaceSync>, String>,
RuntimeEffectControllerError,
> {
let invocation =
self.turn_effect_invocation(machine, id, RuntimeEffectKind::SyncExecutionSurface)?;
self.execute_typed_turn_effect(
machine,
event_tx,
cancel,
RuntimeEffectEnvelope::new(
invocation,
RuntimeEffectCommand::SyncExecutionSurface {
update_machine_config,
},
),
RuntimeEffectOutcome::into_sync_execution_surface,
)
.await
}
pub(super) async fn invoke_turn_exec_effect(
&mut self,
machine: &mut TurnMachine,
invocation: crate::RuntimeInvocation,
code: String,
event_tx: &mpsc::Sender<RuntimeStreamEvent>,
cancel: &CancellationToken,
) -> Result<Result<crate::ExecResponse, String>, RuntimeEffectControllerError> {
self.execute_typed_turn_effect(
machine,
event_tx,
cancel,
RuntimeEffectEnvelope::new(invocation, RuntimeEffectCommand::ExecCode { code }),
RuntimeEffectOutcome::into_exec_code,
)
.await
}
pub(in crate::runtime) async fn run_checkpoint(
&mut self,
machine: &mut TurnMachine,
checkpoint: CheckpointKind,
event_tx: &mpsc::Sender<RuntimeStreamEvent>,
) -> Result<crate::CheckpointDelivery, RuntimeError> {
let mut committed = self.checkpoint_messages.drain().map_err(|err| {
RuntimeError::new(
RuntimeErrorCode::Other("checkpoint_messages".to_string()),
err,
)
})?;
let mut transient_messages = Vec::new();
let mut turn_causes = Vec::new();
let queue_claim = if let Some(store) = self.session.history_store() {
store
.claim_ready_queued_work(
&self.session_id,
&self.turn_id,
crate::QueuedWorkClaimBoundary::ActiveTurnCheckpoint,
crate::QUEUED_WORK_CLAIM_TTL_MS,
64,
)
.await
.map_err(|err| {
RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
})?
} else {
None
};
if let Some(claim) = queue_claim {
let accepted_turn_inputs = claim.accepted_turn_inputs();
let materialized = claim
.materialize_for_checkpoint_with_attachments(
self.host.core.durability.attachment_store.as_ref(),
)
.map_err(|err| {
RuntimeError::new(
RuntimeErrorCode::DurableStoreRequired {
facet: crate::DurableStoreFacet::AttachmentStore,
},
err,
)
})?;
send_queued_work_started_event(
event_tx,
crate::QueuedWorkClaimBoundary::ActiveTurnCheckpoint,
&claim,
materialized.turn_causes.clone(),
)
.await;
self.emit_trace(
machine.protocol_iteration(),
lash_trace::TraceEvent::Custom {
name: "queued_work.claimed".to_string(),
payload: queued_work_trace_payload(
crate::QueuedWorkClaimBoundary::ActiveTurnCheckpoint,
&claim,
&materialized.turn_causes,
),
},
);
committed.extend(materialized.messages);
transient_messages.extend(materialized.transient_messages);
turn_causes.extend(materialized.turn_causes);
if !accepted_turn_inputs.is_empty() {
send_session_event(
event_tx,
SessionEvent::InjectedTurnInputAccepted {
inputs: accepted_turn_inputs,
checkpoint,
},
)
.await;
}
self.pending_queue_claims.push(claim);
}
let plugins = Arc::clone(self.session.plugins());
let applied = plugins
.apply_checkpoint(CheckpointHookContext {
session_id: self.session_id.clone(),
checkpoint,
state: self.checkpoint_state_view(
machine.message_sequence(),
machine.protocol_iteration(),
),
sessions: self.session_services.state_service(),
session_lifecycle: self.session_services.lifecycle_service(),
session_graph: self.session_services.graph_service(),
})
.await
.map_err(|err| {
RuntimeError::new(RuntimeErrorCode::PluginCheckpoint, err.to_string())
})?;
committed.extend(applied.messages);
emit_session_events(event_tx, applied.events).await;
if let Some(abort) = applied.abort {
return Err(RuntimeError::new(abort.code, abort.message));
}
if !committed.is_empty() {
send_session_event(
event_tx,
SessionEvent::InjectedMessagesCommitted {
messages: committed.clone(),
checkpoint,
},
)
.await;
}
Ok(crate::CheckpointDelivery {
messages: committed,
transient_messages,
turn_causes,
})
}
pub(in crate::runtime) async fn run_exec_code(
&mut self,
code: &str,
messages: crate::MessageSequence,
protocol_iteration: usize,
invocation: crate::RuntimeInvocation,
event_tx: &mpsc::Sender<RuntimeStreamEvent>,
) -> Result<crate::ExecResponse, String> {
let (session_event_tx, mut session_event_rx) = mpsc::channel::<SessionEvent>(100);
let (turn_event_tx, mut turn_event_rx) = mpsc::channel::<TurnActivity>(100);
let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::<SandboxMessage>();
self.session.set_message_sender(msg_tx);
let relay_tx = event_tx.clone();
let relay_handle = tokio::spawn(async move {
let mut sandbox_closed = false;
let mut session_closed = false;
let mut turn_closed = false;
while !(sandbox_closed && session_closed && turn_closed) {
tokio::select! {
biased;
maybe_sandbox = msg_rx.recv(), if !sandbox_closed => {
let Some(sandbox_msg) = maybe_sandbox else {
sandbox_closed = true;
continue;
};
if sandbox_msg.kind != "lashlang_code" && !relay_tx.is_closed() {
let _ = relay_tx
.send(RuntimeStreamEvent::Session(SessionEvent::Message {
text: sandbox_msg.text,
kind: sandbox_msg.kind,
}))
.await;
}
}
maybe_event = session_event_rx.recv(), if !session_closed => {
let Some(event) = maybe_event else {
session_closed = true;
continue;
};
send_session_event(&relay_tx, event).await;
}
maybe_turn_event = turn_event_rx.recv(), if !turn_closed => {
let Some(event) = maybe_turn_event else {
turn_closed = true;
continue;
};
let _ = relay_tx.send(RuntimeStreamEvent::Turn(event)).await;
}
}
}
});
let code_executor = self.session.plugins().code_executor();
let read_view = self.checkpoint_state_view(messages, protocol_iteration);
let chronological_projection = read_view.shared_chronological_projection();
let context = self
.execution_context(session_event_tx.clone(), chronological_projection)
.map_err(|err| err.to_string())?
.with_turn_event_sender(turn_event_tx.clone())
.with_lashlang_execution_trace(
self.host.core.tracing.lashlang_execution_sink.clone(),
self.host.core.tracing.trace_context.clone(),
);
let context = context.with_parent_invocation(invocation);
let result = match code_executor {
Some(code_executor) => code_executor
.execute_code(
context,
crate::ExecRequest {
code: code.to_string(),
accept_finish: true,
},
)
.await
.map_err(|e| e.to_string()),
None => {
drop(context);
Err(crate::SessionError::CodeExecutionUnavailable.to_string())
}
};
drop(session_event_tx);
drop(turn_event_tx);
self.session.clear_message_sender();
let _ = relay_handle.await;
result
}
}