use super::*;
impl MeerkatMachine {
pub(super) async fn cancel_after_boundary_inner(
&self,
session_id: &SessionId,
) -> Result<(), RuntimeDriverError> {
let staged = match self
.stage_session_dsl_transition(
session_id,
crate::meerkat_machine::dsl::MeerkatMachineInput::CancelAfterBoundary {
reason: "boundary cancel".to_string(),
},
"CancelAfterBoundary",
)
.await
{
Ok(staged) => staged,
Err(_) => {
return Err(RuntimeDriverError::NotReady {
state: self
.existing_session_runtime_state(session_id)
.await
.unwrap_or(RuntimeState::Destroyed),
});
}
};
let projected_effect =
crate::effect::runtime_effect_projection_from_dsl_effects(&staged.effects)
.map_err(RuntimeDriverError::Internal)?;
let (effect_tx, boundary_handle) = {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
(entry.effect_sender(), entry.boundary_handle())
};
let Some(effect_tx) = effect_tx else {
let state = self
.existing_session_runtime_state(session_id)
.await
.unwrap_or(RuntimeState::Destroyed);
self.restore_session_dsl_state(session_id, staged.previous_state)
.await;
return Err(RuntimeDriverError::NotReady { state });
};
let reason = projected_effect.reason().to_string();
if let Some(boundary_handle) = boundary_handle {
if let Err(err) = boundary_handle.cancel_after_boundary(reason.clone()).await {
self.restore_session_dsl_state(session_id, staged.previous_state)
.await;
return Err(RuntimeDriverError::Internal(format!(
"failed to apply live boundary cancel: {err}"
)));
}
match effect_tx.try_send(projected_effect.into_effect()) {
Ok(()) => {}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
tracing::debug!(
%session_id,
"runtime effect channel full after live boundary cancel; live wake already delivered"
);
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
tracing::warn!(
%session_id,
"runtime effect channel closed after live boundary cancel; live wake already delivered"
);
let mut sessions = self.sessions.write().await;
if let Some(entry) = sessions.get_mut(session_id) {
entry.clear_dead_attachment();
}
}
}
return Ok(());
}
if let Err(err) = effect_tx.send(projected_effect.into_effect()).await {
self.restore_session_dsl_state(session_id, staged.previous_state)
.await;
return Err(RuntimeDriverError::Internal(format!(
"failed to send runtime effect: {err}"
)));
}
Ok(())
}
pub async fn stop_runtime_executor(
&self,
session_id: &SessionId,
reason: impl Into<String>,
) -> Result<(), RuntimeDriverError> {
self.execute_meerkat_machine_command(
None,
MeerkatMachineCommand::StopRuntimeExecutor {
session_id: session_id.clone(),
reason: reason.into(),
},
)
.await
.map_err(MeerkatMachine::driver_error_from_command_error)
.map(|_| ())
}
pub(super) async fn stop_runtime_executor_inner(
&self,
session_id: &SessionId,
reason: String,
) -> Result<(), RuntimeDriverError> {
let staged = self
.stage_session_dsl_transition(
session_id,
crate::meerkat_machine::dsl::MeerkatMachineInput::StopRuntimeExecutor { reason },
"StopRuntimeExecutor",
)
.await
.map_err(|reason| RuntimeDriverError::ValidationFailed { reason })?;
let projected_effect =
crate::effect::runtime_effect_projection_from_dsl_effects(&staged.effects)
.map_err(RuntimeDriverError::Internal)?;
let (driver, completions, effect_tx) = {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
(
entry.driver.clone(),
entry.completions.clone(),
entry.effect_sender(),
)
};
let state_before_stop = self
.existing_session_runtime_state(session_id)
.await
.unwrap_or(RuntimeState::Destroyed);
let effect = projected_effect.into_effect();
if let Some(effect_tx) = effect_tx
&& effect_tx.send(effect).await.is_ok()
{
if matches!(state_before_stop, RuntimeState::Attached) {
let _ = tokio::time::timeout(std::time::Duration::from_millis(200), async {
loop {
match self.existing_session_runtime_state(session_id).await {
Some(RuntimeState::Stopped | RuntimeState::Destroyed) => break,
Some(RuntimeState::Attached) => {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
Some(_) | None => break,
}
}
})
.await;
}
return Ok(());
}
crate::control_plane::terminalize_async_stop(&driver, Some(&completions)).await?;
let mut sessions = self.sessions.write().await;
if let Some(entry) = sessions.get_mut(session_id) {
entry.clear_dead_attachment();
}
Ok(())
}
pub async fn accept_input_and_run<T, F, Fut>(
&self,
session_id: &SessionId,
input: Input,
op: F,
) -> Result<T, RuntimeDriverError>
where
F: FnOnce(RunId, meerkat_core::lifecycle::run_primitive::RunPrimitive) -> Fut,
Fut: Future<Output = Result<(T, CoreApplyOutput), RuntimeDriverError>>,
{
let MeerkatMachineRunPrepared {
input_id,
run_id,
primitive,
} = match self
.execute_meerkat_machine_command(
None,
MeerkatMachineCommand::Prepare {
session_id: session_id.clone(),
input,
},
)
.await
.map_err(MeerkatMachine::driver_error_from_command_error)?
{
MeerkatMachineCommandResult::Prepared(prepared) => prepared,
other => {
return Err(RuntimeDriverError::Internal(format!(
"unexpected command result preparing Meerkat run: {other:?}"
)));
}
};
match op(run_id.clone(), primitive).await {
Ok((result, output)) => {
self.execute_meerkat_machine_command(
None,
MeerkatMachineCommand::Commit {
session_id: session_id.clone(),
input_id,
run_id,
output,
},
)
.await
.map_err(MeerkatMachine::driver_error_from_command_error)?;
Ok(result)
}
Err(err) => {
self.execute_meerkat_machine_command(
None,
MeerkatMachineCommand::Fail {
session_id: session_id.clone(),
run_id,
failure: MeerkatMachineRunFailure::new(
meerkat_core::TurnTerminalCauseKind::FatalFailure,
err.to_string(),
),
},
)
.await
.map_err(MeerkatMachine::driver_error_from_command_error)?;
Err(err)
}
}
}
pub async fn accept_input_with_completion(
&self,
session_id: &SessionId,
input: Input,
) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
{
match self
.execute_meerkat_machine_command(
None,
MeerkatMachineCommand::AcceptWithCompletion {
session_id: session_id.clone(),
input,
},
)
.await
.map_err(MeerkatMachine::driver_error_from_command_error)?
{
MeerkatMachineCommandResult::AcceptWithCompletion {
outcome,
handle,
admission_signal: _,
} => Ok((outcome, handle)),
other => Err(RuntimeDriverError::Internal(format!(
"unexpected command result for accept_input_with_completion: {other:?}"
))),
}
}
pub async fn accept_input_without_wake(
&self,
session_id: &SessionId,
input: Input,
) -> Result<AcceptOutcome, RuntimeDriverError> {
match self
.execute_meerkat_machine_command(
None,
MeerkatMachineCommand::AcceptWithoutWake {
session_id: session_id.clone(),
input,
},
)
.await
.map_err(MeerkatMachine::driver_error_from_command_error)?
{
MeerkatMachineCommandResult::AcceptOutcome(outcome) => Ok(outcome),
other => Err(RuntimeDriverError::Internal(format!(
"unexpected command result for accept_input_without_wake: {other:?}"
))),
}
}
pub async fn ops_lifecycle_registry(
&self,
session_id: &SessionId,
) -> Option<Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>> {
match self
.execute_meerkat_machine_command(
None,
MeerkatMachineCommand::OpsLifecycleRegistry {
session_id: session_id.clone(),
},
)
.await
{
Ok(MeerkatMachineCommandResult::OpsLifecycleRegistry(registry)) => registry,
Ok(_) => {
tracing::error!("ops_lifecycle_registry: unexpected command result variant");
None
}
Err(_) => None,
}
}
pub async fn prepare_bindings(
&self,
session_id: SessionId,
) -> Result<meerkat_core::SessionRuntimeBindings, RuntimeBindingsError> {
match self
.execute_meerkat_machine_command(
None,
MeerkatMachineCommand::PrepareBindings {
session_id: session_id.clone(),
},
)
.await
{
Ok(MeerkatMachineCommandResult::Bindings(bindings)) => Ok(bindings),
Ok(_) => {
tracing::error!("prepare_bindings: unexpected command result variant");
Err(RuntimeBindingsError::SessionNotFound(session_id))
}
Err(_) => Err(RuntimeBindingsError::SessionNotFound(session_id)),
}
}
pub async fn prepare_local_session_bindings(
&self,
session_id: SessionId,
) -> Result<meerkat_core::SessionRuntimeBindings, RuntimeBindingsError> {
match self
.execute_meerkat_machine_command(
None,
MeerkatMachineCommand::PrepareLocalSessionBindings {
session_id: session_id.clone(),
},
)
.await
{
Ok(MeerkatMachineCommandResult::Bindings(bindings)) => Ok(bindings),
Ok(_) => {
tracing::error!(
"prepare_local_session_bindings: unexpected command result variant"
);
Err(RuntimeBindingsError::SessionNotFound(session_id))
}
Err(_) => Err(RuntimeBindingsError::SessionNotFound(session_id)),
}
}
}