use meerkat_core::lifecycle::core_executor::{
CoreApplyFailureCause, CoreApplyTerminal, CoreExecutorError,
};
use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive, StagedRunInput};
use meerkat_core::lifecycle::{InputId, RunId};
use meerkat_core::turn_execution_authority::ContentShape as TurnContentShape;
use crate::input::Input;
#[cfg(test)]
use crate::input::input_prompt_text;
#[cfg(test)]
use crate::input::runtime_input_projection_for_machine_batch;
use crate::runtime_state::RuntimeState;
use crate::tokio;
#[cfg(test)]
pub(crate) fn input_to_prompt(input: &Input) -> String {
input_prompt_text(input)
}
pub(crate) fn for_input(
input: &Input,
semantics: crate::ingress_types::RuntimeInputSemantics,
) -> meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata {
use meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata;
let mut metadata = match input {
Input::Prompt(prompt) => prompt.turn_metadata.clone().unwrap_or_default(),
Input::FlowStep(flow_step) => flow_step.turn_metadata.clone().unwrap_or_default(),
Input::ExternalEvent(event) => RuntimeTurnMetadata {
handling_mode: Some(event.handling_mode),
render_metadata: event.render_metadata.clone(),
..Default::default()
},
Input::Continuation(continuation) => RuntimeTurnMetadata {
handling_mode: Some(continuation.handling_mode),
..Default::default()
},
Input::Peer(peer) => RuntimeTurnMetadata {
handling_mode: peer.handling_mode,
..Default::default()
},
_ => RuntimeTurnMetadata::default(),
};
metadata.execution_kind = Some(semantics.execution_kind);
metadata.peer_response_terminal_apply_intent = semantics.peer_response_terminal_apply_intent;
metadata
}
pub(crate) fn merge_batch_turn_metadata(
inputs: &[(meerkat_core::lifecycle::InputId, Input)],
semantics: &[crate::ingress_types::RuntimeInputSemantics],
) -> Result<
Option<meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata>,
meerkat_core::lifecycle::run_primitive::TurnMetadataMergeConflict,
> {
use meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata;
if inputs.len() != semantics.len() {
return Err(
meerkat_core::lifecycle::run_primitive::TurnMetadataMergeConflict {
field: "execution_kind",
reason: "runtime-stamped execution kind missing for one or more inputs",
},
);
}
let mut acc: Option<RuntimeTurnMetadata> = None;
for ((_, input), semantics) in inputs.iter().zip(semantics.iter()) {
let meta = for_input(input, *semantics);
match acc.as_mut() {
None => acc = Some(meta),
Some(existing) => {
existing.merge(meta)?;
}
}
}
Ok(acc.filter(|m| !m.is_empty()))
}
fn resolve_completion_waiters(
registry: &mut crate::completion::CompletionRegistry,
input_ids: &[InputId],
terminal: Option<CoreApplyTerminal>,
) {
match terminal {
Some(CoreApplyTerminal::CallbackPending { tool_name, args }) => {
for input_id in input_ids {
registry.resolve_callback_pending(input_id, tool_name.clone(), args.clone());
}
}
Some(CoreApplyTerminal::RunResult(result)) => {
for input_id in input_ids {
registry.resolve_completed(input_id, result.as_ref().clone());
}
}
Some(CoreApplyTerminal::NoPendingBoundary) | None => {
for input_id in input_ids {
registry.resolve_without_result(input_id);
}
}
}
}
fn resolve_completion_waiters_with_finalization_failure(
registry: &mut crate::completion::CompletionRegistry,
input_ids: &[InputId],
terminal: Option<CoreApplyTerminal>,
error: meerkat_core::TurnErrorMetadata,
) {
match terminal {
Some(CoreApplyTerminal::RunResult(result)) => {
for input_id in input_ids {
registry.resolve_completed_with_finalization_failure(
input_id,
result.as_ref().clone(),
error.clone(),
);
}
}
_ => {
let reason = error
.detail
.clone()
.unwrap_or_else(|| "runtime finalization failed".to_string());
abandon_completion_waiters_with_error(registry, input_ids, reason, error);
}
}
}
async fn stop_runtime_loop_executor_from_dsl_effect(
driver: &crate::meerkat_machine::SharedDriver,
completions: Option<&crate::meerkat_machine::SharedCompletionRegistry>,
executor: &mut dyn meerkat_core::lifecycle::CoreExecutor,
reason: String,
) -> bool {
let authority = {
let driver = driver.lock().await;
driver.shared_dsl_authority()
};
let effects = match crate::meerkat_machine::apply_dsl_transition_on_authority(
&authority,
crate::meerkat_machine::dsl::MeerkatMachineInput::StopRuntimeExecutor { reason },
"RuntimeLoopStopRuntimeExecutor",
) {
Ok(effects) => effects,
Err(error) => {
tracing::error!(
error = %error,
"failed to apply DSL stop-runtime-executor transition after runtime loop snapshot failure"
);
return true;
}
};
let projected_effect = match crate::effect::runtime_effect_projection_from_dsl_effects(&effects)
{
Ok(effect) => effect,
Err(error) => {
tracing::error!(
error = %error,
"DSL stop-runtime-executor transition did not emit a runtime effect fact"
);
return true;
}
};
match crate::control_plane::apply_executor_effect(
driver,
completions,
executor,
projected_effect.into_effect(),
)
.await
{
Ok(should_stop) => should_stop,
Err(error) => {
tracing::error!(
error = %error,
"failed to apply stop-runtime-executor effect from runtime loop"
);
true
}
}
}
fn abandon_completion_waiters(
registry: &mut crate::completion::CompletionRegistry,
input_ids: &[InputId],
reason: impl Into<String>,
) {
let reason = reason.into();
for input_id in input_ids {
registry.resolve_abandoned(input_id, reason.clone());
}
}
fn abandon_completion_waiters_with_error(
registry: &mut crate::completion::CompletionRegistry,
input_ids: &[InputId],
reason: impl Into<String>,
error: meerkat_core::TurnErrorMetadata,
) {
let reason = reason.into();
for input_id in input_ids {
registry.resolve_abandoned_with_error(input_id, reason.clone(), error.clone());
}
}
fn primitive_admitted_content_shape(primitive: &RunPrimitive) -> TurnContentShape {
match primitive {
RunPrimitive::StagedInput(staged) => TurnContentShape::from_staged_presence(
!staged.appends.is_empty(),
!staged.context_appends.is_empty(),
),
RunPrimitive::ImmediateAppend(_) => TurnContentShape::ImmediateAppend,
RunPrimitive::ImmediateContextAppend(_) => TurnContentShape::ImmediateContext,
_ => TurnContentShape::Conversation,
}
}
fn primitive_turn_start_input(
run_id: &RunId,
primitive: &RunPrimitive,
) -> Option<crate::meerkat_machine::dsl::MeerkatMachineInput> {
match primitive {
RunPrimitive::ImmediateAppend(_) => Some(
crate::meerkat_machine::dsl::MeerkatMachineInput::StartImmediateAppend {
run_id: crate::meerkat_machine::dsl::RunId::from_domain(run_id),
},
),
RunPrimitive::ImmediateContextAppend(_) => Some(
crate::meerkat_machine::dsl::MeerkatMachineInput::StartImmediateContext {
run_id: crate::meerkat_machine::dsl::RunId::from_domain(run_id),
},
),
RunPrimitive::StagedInput(_) if primitive.is_peer_response_terminal_context_and_run() => {
let admitted_content_shape = crate::meerkat_machine::dsl::ContentShape::from(
primitive_admitted_content_shape(primitive),
);
Some(
crate::meerkat_machine::dsl::MeerkatMachineInput::StartConversationRun {
run_id: crate::meerkat_machine::dsl::RunId::from_domain(run_id),
primitive_kind:
crate::meerkat_machine::dsl::TurnPrimitiveKind::ConversationTurn,
admitted_content_shape,
vision_enabled: false,
image_tool_results_enabled: false,
max_extraction_retries: 0,
},
)
}
RunPrimitive::StagedInput(_) if primitive.is_context_only_apply_without_turn() => Some(
crate::meerkat_machine::dsl::MeerkatMachineInput::StartImmediateContext {
run_id: crate::meerkat_machine::dsl::RunId::from_domain(run_id),
},
),
RunPrimitive::StagedInput(staged) if staged.appends.is_empty() => None,
RunPrimitive::StagedInput(_) => {
let admitted_content_shape = crate::meerkat_machine::dsl::ContentShape::from(
primitive_admitted_content_shape(primitive),
);
Some(
crate::meerkat_machine::dsl::MeerkatMachineInput::StartConversationRun {
run_id: crate::meerkat_machine::dsl::RunId::from_domain(run_id),
primitive_kind:
crate::meerkat_machine::dsl::TurnPrimitiveKind::ConversationTurn,
admitted_content_shape,
vision_enabled: false,
image_tool_results_enabled: false,
max_extraction_retries: 0,
},
)
}
_ => {
let admitted_content_shape = crate::meerkat_machine::dsl::ContentShape::from(
primitive_admitted_content_shape(primitive),
);
Some(
crate::meerkat_machine::dsl::MeerkatMachineInput::StartConversationRun {
run_id: crate::meerkat_machine::dsl::RunId::from_domain(run_id),
primitive_kind:
crate::meerkat_machine::dsl::TurnPrimitiveKind::ConversationTurn,
admitted_content_shape,
vision_enabled: false,
image_tool_results_enabled: false,
max_extraction_retries: 0,
},
)
}
}
}
async fn prepare_turn_state_for_primitive(
driver: &crate::meerkat_machine::SharedDriver,
run_id: &RunId,
primitive: &RunPrimitive,
) -> Result<(), crate::RuntimeDriverError> {
if let Some(reason) = primitive.peer_response_terminal_apply_intent_violation() {
return Err(crate::RuntimeDriverError::Internal(reason.to_string()));
}
let Some(input) = primitive_turn_start_input(run_id, primitive) else {
return Ok(());
};
let authority = {
let driver = driver.lock().await;
driver.shared_dsl_authority()
};
let mut auth = authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if auth.state.lifecycle_phase == crate::meerkat_machine::dsl::MeerkatPhase::Retired {
return Ok(());
}
crate::meerkat_machine::dsl::MeerkatMachineMutator::apply(&mut *auth, input)
.map(|_| ())
.map_err(|err| {
crate::RuntimeDriverError::Internal(format!(
"failed to start runtime turn state for run {run_id}: {err}"
))
})
}
#[cfg(test)]
pub(crate) fn try_inputs_to_primitive_with_boundary(
inputs: &[(InputId, Input)],
boundary: RunApplyBoundary,
semantics: &[crate::ingress_types::RuntimeInputSemantics],
) -> Result<RunPrimitive, meerkat_core::lifecycle::run_primitive::TurnMetadataMergeConflict> {
let projections = inputs
.iter()
.map(|(_, input)| runtime_input_projection_for_machine_batch(input))
.collect::<Vec<_>>();
try_projected_inputs_to_primitive_with_boundary(inputs, &projections, boundary, semantics)
}
pub(crate) fn try_projected_inputs_to_primitive_with_boundary(
inputs: &[(InputId, Input)],
projections: &[crate::ingress_types::RuntimeInputProjection],
boundary: RunApplyBoundary,
semantics: &[crate::ingress_types::RuntimeInputSemantics],
) -> Result<RunPrimitive, meerkat_core::lifecycle::run_primitive::TurnMetadataMergeConflict> {
let appends = projections
.iter()
.flat_map(|projection| {
projection
.append
.clone()
.into_iter()
.chain(projection.additional_appends.clone())
})
.collect::<Vec<_>>();
let context_appends = projections
.iter()
.filter_map(|projection| projection.context_append.clone())
.collect::<Vec<_>>();
let contributing_input_ids = inputs
.iter()
.map(|(input_id, _)| input_id.clone())
.collect::<Vec<_>>();
let turn_metadata = merge_batch_turn_metadata(inputs, semantics)?;
Ok(RunPrimitive::StagedInput(StagedRunInput {
boundary,
appends,
context_appends,
contributing_input_ids,
turn_metadata,
}))
}
#[cfg(test)]
pub(crate) fn inputs_to_primitive_with_boundary(
inputs: &[(InputId, Input)],
boundary: RunApplyBoundary,
) -> Result<RunPrimitive, meerkat_core::lifecycle::run_primitive::TurnMetadataMergeConflict> {
let semantics = fallback_batch_semantics(inputs);
try_inputs_to_primitive_with_boundary(inputs, boundary, &semantics)
}
#[cfg(test)]
pub(crate) fn inputs_to_primitive(
inputs: &[(InputId, Input)],
) -> Result<RunPrimitive, meerkat_core::lifecycle::run_primitive::TurnMetadataMergeConflict> {
let boundary = inputs
.first()
.map(|(_, input)| fallback_unadmitted_semantics(input).boundary)
.unwrap_or(RunApplyBoundary::RunStart);
inputs_to_primitive_with_boundary(inputs, boundary)
}
#[cfg(test)]
fn fallback_unadmitted_semantics(input: &Input) -> crate::ingress_types::RuntimeInputSemantics {
let policy = crate::policy_table::DefaultPolicyTable::resolve(input, true);
crate::ingress_types::RuntimeInputSemantics::from_policy_and_kind(&policy, input.kind())
}
#[cfg(test)]
fn fallback_batch_semantics(
inputs: &[(InputId, Input)],
) -> Vec<crate::ingress_types::RuntimeInputSemantics> {
inputs
.iter()
.map(|(_, input)| fallback_unadmitted_semantics(input))
.collect()
}
#[cfg(test)]
pub(crate) fn input_to_primitive(
input: &Input,
input_id: InputId,
) -> Result<RunPrimitive, meerkat_core::lifecycle::run_primitive::TurnMetadataMergeConflict> {
inputs_to_primitive(&[(input_id, input.clone())])
}
pub(crate) fn admitted_input_to_primitive(
input: &Input,
input_id: InputId,
projection: crate::ingress_types::RuntimeInputProjection,
semantics: crate::ingress_types::RuntimeInputSemantics,
) -> Result<RunPrimitive, meerkat_core::lifecycle::run_primitive::TurnMetadataMergeConflict> {
try_projected_inputs_to_primitive_with_boundary(
&[(input_id, input.clone())],
&[projection],
semantics.boundary,
&[semantics],
)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn spawn_runtime_loop_with_completions(
driver: crate::meerkat_machine::SharedDriver,
mut executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
mut wake_rx: tokio::sync::mpsc::Receiver<()>,
mut effect_rx: tokio::sync::mpsc::Receiver<crate::effect::RuntimeEffect>,
completions: Option<crate::meerkat_machine::SharedCompletionRegistry>,
completion_feed: Option<std::sync::Arc<dyn meerkat_core::completion_feed::CompletionFeed>>,
epoch_cursor_state: Option<std::sync::Arc<meerkat_core::EpochCursorState>>,
_machine_weak: std::sync::Weak<crate::meerkat_machine::MeerkatMachine>,
_session_id: meerkat_core::types::SessionId,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let initial_watermark = epoch_cursor_state
.as_ref()
.map(|cs| {
let obs = cs
.runtime_observed_seq
.load(std::sync::atomic::Ordering::Acquire);
let inj = cs
.runtime_last_injected_seq
.load(std::sync::atomic::Ordering::Acquire);
obs.max(inj)
})
.unwrap_or_else(|| completion_feed.as_ref().map(|f| f.watermark()).unwrap_or(0));
let mut observed_seq: meerkat_core::completion_feed::CompletionSeq = initial_watermark;
let mut last_injected_seq: meerkat_core::completion_feed::CompletionSeq =
epoch_cursor_state
.as_ref()
.map(|cs| {
cs.runtime_last_injected_seq
.load(std::sync::atomic::Ordering::Acquire)
})
.filter(|&v| v > 0)
.unwrap_or(initial_watermark);
loop {
let idle_wake = async {
if let Some(ref feed) = completion_feed {
feed.wait_for_advance(observed_seq).await;
} else {
std::future::pending::<()>().await;
}
};
tokio::select! {
biased;
maybe_effect = effect_rx.recv() => {
match maybe_effect {
Some(effect) => {
match crate::control_plane::apply_executor_effect(
&driver,
completions.as_ref(),
&mut *executor,
effect,
)
.await
{
Ok(true) => break,
Ok(false) => {}
Err(error) => {
tracing::error!(
error = %error,
"failed to apply runtime executor effect"
);
break;
}
}
}
None => break,
}
}
maybe_wake = wake_rx.recv() => {
match maybe_wake {
Some(()) => {
if process_queue(
&driver,
&mut *executor,
&mut effect_rx,
completions.as_ref(),
)
.await
{
break;
}
if maybe_inject_feed_wake(
&driver,
completion_feed.as_deref(),
&mut observed_seq,
&mut last_injected_seq,
epoch_cursor_state.as_deref(),
)
.await
&& process_queue(
&driver,
&mut *executor,
&mut effect_rx,
completions.as_ref(),
)
.await
{
break;
}
}
None => break,
}
}
() = idle_wake => {
if let Some(ref feed) = completion_feed {
let batch = feed.list_since(observed_seq);
let has_new_bg_completion = batch.entries.iter().any(|e| {
e.kind
== meerkat_core::ops_lifecycle::OperationKind::BackgroundToolOp
&& e.seq > last_injected_seq
});
if has_new_bg_completion {
let d = driver.lock().await;
let quiescent = d.is_quiescent_for_detached_wake();
drop(d);
if quiescent {
let input = crate::input::Input::Continuation(
crate::input::ContinuationInput::detached_background_op_completed(),
);
let mut d = driver.lock().await;
if d.as_driver_mut().accept_input(input).await.is_ok() {
last_injected_seq = batch.watermark;
if let Some(ref cs) = epoch_cursor_state {
cs.runtime_last_injected_seq.store(batch.watermark, std::sync::atomic::Ordering::Release);
}
}
observed_seq = batch.watermark;
if let Some(ref cs) = epoch_cursor_state {
cs.runtime_observed_seq.store(batch.watermark, std::sync::atomic::Ordering::Release);
}
drop(d);
if process_queue(
&driver,
&mut *executor,
&mut effect_rx,
completions.as_ref(),
)
.await
{
break;
}
}
} else {
observed_seq = batch.watermark;
if let Some(ref cs) = epoch_cursor_state {
cs.runtime_observed_seq.store(batch.watermark, std::sync::atomic::Ordering::Release);
}
}
}
}
}
}
if let Some(ref completions) = completions {
let mut reg = completions.lock().await;
reg.resolve_all_terminated("runtime loop exited");
}
})
}
async fn maybe_inject_feed_wake(
driver: &crate::meerkat_machine::SharedDriver,
feed: Option<&dyn meerkat_core::completion_feed::CompletionFeed>,
observed_seq: &mut meerkat_core::completion_feed::CompletionSeq,
last_injected_seq: &mut meerkat_core::completion_feed::CompletionSeq,
epoch_cursor_state: Option<&meerkat_core::EpochCursorState>,
) -> bool {
let Some(feed) = feed else {
return false;
};
let batch = feed.list_since(*observed_seq);
let has_new_bg_completion = batch.entries.iter().any(|e| {
e.kind == meerkat_core::ops_lifecycle::OperationKind::BackgroundToolOp
&& e.seq > *last_injected_seq
});
if !has_new_bg_completion {
*observed_seq = batch.watermark;
if let Some(cs) = epoch_cursor_state {
cs.runtime_observed_seq
.store(batch.watermark, std::sync::atomic::Ordering::Release);
}
return false;
}
let d = driver.lock().await;
if !d.is_quiescent_for_detached_wake() {
return false;
}
drop(d);
let input = crate::input::Input::Continuation(
crate::input::ContinuationInput::detached_background_op_completed(),
);
let mut d = driver.lock().await;
if d.as_driver_mut().accept_input(input).await.is_ok() {
*last_injected_seq = batch.watermark;
if let Some(cs) = epoch_cursor_state {
cs.runtime_last_injected_seq
.store(batch.watermark, std::sync::atomic::Ordering::Release);
}
}
*observed_seq = batch.watermark;
if let Some(cs) = epoch_cursor_state {
cs.runtime_observed_seq
.store(batch.watermark, std::sync::atomic::Ordering::Release);
}
true
}
#[allow(clippy::too_many_arguments)]
async fn process_queue(
driver: &crate::meerkat_machine::SharedDriver,
executor: &mut dyn meerkat_core::lifecycle::CoreExecutor,
effect_rx: &mut tokio::sync::mpsc::Receiver<crate::effect::RuntimeEffect>,
completions: Option<&crate::meerkat_machine::SharedCompletionRegistry>,
) -> bool {
loop {
match crate::control_plane::drain_ready_executor_effects(
driver,
completions,
executor,
effect_rx,
)
.await
{
Ok(true) => return true,
Ok(false) => {}
Err(error) => {
tracing::error!(
error = %error,
"failed to drain runtime executor effect"
);
return true;
}
}
let dequeued = {
let mut d = driver.lock().await;
let prebound_run_id = if d.runtime_state() == RuntimeState::Running {
d.current_run_id()
} else {
None
};
if !d.can_process_queue() && prebound_run_id.is_none() {
return false;
}
let batch_ids = crate::meerkat_machine::machine_select_runtime_loop_batch(&d);
if batch_ids.is_empty() {
return false;
}
let staged_inputs: Vec<_> = batch_ids
.iter()
.filter_map(|id| d.dequeue_by_id(id))
.collect();
if staged_inputs.is_empty() {
return false;
}
let run_id = prebound_run_id.unwrap_or_else(RunId::new);
let staged_ids: Vec<_> = staged_inputs.iter().map(|(id, _)| id.clone()).collect();
let boundary = staged_inputs
.first()
.map(|(id, _)| crate::meerkat_machine::machine_input_boundary(&d, id))
.unwrap_or(RunApplyBoundary::RunStart);
let contributing_input_ids = staged_inputs
.iter()
.map(|(staged_input_id, _)| staged_input_id.clone())
.collect::<Vec<_>>();
let semantics =
crate::meerkat_machine::machine_batch_runtime_semantics(&d, &staged_ids);
let projections =
crate::meerkat_machine::machine_batch_primitive_projections(&d, &staged_inputs);
let primitive = match semantics {
Some(semantics) => try_projected_inputs_to_primitive_with_boundary(
&staged_inputs,
&projections,
boundary,
&semantics,
),
None => Err(
meerkat_core::lifecycle::run_primitive::TurnMetadataMergeConflict {
field: "execution_kind",
reason: "runtime-stamped execution kind missing for one or more inputs",
},
),
};
Some((contributing_input_ids, staged_ids, run_id, primitive))
};
match dequeued {
Some((input_ids, staged_ids, run_id, primitive)) => {
if let Err(err) = crate::meerkat_machine::prepare_runtime_loop_batch_start(
driver,
run_id.clone(),
&staged_ids,
)
.await
{
tracing::error!(%run_id, error = %err, "failed to prepare runtime loop batch");
if let Some(completions) = completions.as_ref() {
let mut completions = completions.lock().await;
abandon_completion_waiters(
&mut completions,
&input_ids,
format!("runtime batch preparation failed: {err}"),
);
}
return false;
}
let primitive = match primitive {
Ok(primitive) => primitive,
Err(conflict) => {
tracing::error!(
%run_id,
field = conflict.field,
reason = conflict.reason,
"batch turn-metadata merge conflict"
);
if let Err(err) = crate::meerkat_machine::fail_runtime_loop_run(
driver,
run_id,
CoreApplyFailureCause::primitive_rejected(conflict.to_string()),
)
.await
{
tracing::error!(error = %err, "failed to record primitive rejection terminal event");
let should_stop = stop_runtime_loop_executor_from_dsl_effect(
driver,
completions,
executor,
format!("runtime primitive rejection snapshot failed: {err}"),
)
.await;
if let Some(completions) = completions.as_ref() {
let mut completions = completions.lock().await;
abandon_completion_waiters(
&mut completions,
&input_ids,
format!("runtime primitive rejection snapshot failed: {err}"),
);
}
return should_stop;
}
if let Some(completions) = completions.as_ref() {
let mut completions = completions.lock().await;
abandon_completion_waiters(
&mut completions,
&input_ids,
format!("runtime primitive rejected: {conflict}"),
);
}
return false;
}
};
if let Err(error) =
prepare_turn_state_for_primitive(driver, &run_id, &primitive).await
{
tracing::error!(%run_id, error = %error, "failed to start runtime turn state");
if let Err(err) = crate::meerkat_machine::fail_runtime_loop_run(
driver,
run_id,
CoreApplyFailureCause::executor_internal(error.to_string()),
)
.await
{
tracing::error!(error = %err, "failed to record turn-state preparation terminal event");
let should_stop = stop_runtime_loop_executor_from_dsl_effect(
driver,
completions,
executor,
format!("runtime turn-state preparation snapshot failed: {err}"),
)
.await;
if let Some(completions) = completions.as_ref() {
let mut completions = completions.lock().await;
abandon_completion_waiters(
&mut completions,
&input_ids,
format!("runtime turn-state preparation snapshot failed: {err}"),
);
}
return should_stop;
}
if let Some(completions) = completions.as_ref() {
let mut completions = completions.lock().await;
abandon_completion_waiters(
&mut completions,
&input_ids,
format!("runtime turn-state preparation failed: {error}"),
);
}
return false;
}
let result = executor.apply(run_id.clone(), primitive).await;
let d = driver.lock().await;
match result {
Ok(output) => {
let meerkat_core::lifecycle::core_executor::CoreApplyOutput {
receipt,
session_snapshot,
terminal,
} = output;
drop(d);
let committed_session_snapshot = session_snapshot.clone();
if let Err(err) = crate::meerkat_machine::commit_runtime_loop_run(
driver,
run_id.clone(),
input_ids.clone(),
receipt,
session_snapshot,
)
.await
{
tracing::error!(%run_id, error = %err, "failed to commit runtime loop run");
let completion_error =
meerkat_core::TurnErrorMetadata::runtime_apply_failure(format!(
"runtime loop commit failed: {err}"
));
if let Some(completions) = completions.as_ref() {
let mut completions = completions.lock().await;
resolve_completion_waiters_with_finalization_failure(
&mut completions,
&input_ids,
terminal,
completion_error,
);
}
let should_stop = stop_runtime_loop_executor_from_dsl_effect(
driver,
completions,
executor,
format!("runtime loop commit failed for run {run_id}: {err}"),
)
.await;
return should_stop;
}
if let Some(session_snapshot) = committed_session_snapshot.as_deref()
&& let Err(err) = executor
.checkpoint_committed_session_snapshot(session_snapshot)
.await
{
tracing::error!(
%run_id,
error = %err,
"failed to checkpoint committed runtime session snapshot"
);
let completion_error =
meerkat_core::TurnErrorMetadata::runtime_apply_failure(format!(
"runtime session checkpoint failed after commit: {err}"
));
if let Some(completions) = completions.as_ref() {
let mut completions = completions.lock().await;
resolve_completion_waiters_with_finalization_failure(
&mut completions,
&input_ids,
terminal,
completion_error,
);
}
let should_stop = stop_runtime_loop_executor_from_dsl_effect(
driver,
completions,
executor,
format!(
"runtime session checkpoint failed after commit for run {run_id}: {err}"
),
)
.await;
return should_stop;
}
if let Some(completions) = completions.as_ref() {
let mut reg = completions.lock().await;
resolve_completion_waiters(&mut reg, &input_ids, terminal);
}
}
Err(e) => {
let cancelled = e.is_cancelled();
let error_msg = e.to_string();
let terminal_failure = match &e {
CoreExecutorError::TerminalFailure {
outcome,
cause_kind,
message,
..
} => Some(
crate::meerkat_machine_types::MeerkatMachineRunFailure::terminal(
*outcome,
*cause_kind,
message.clone(),
),
),
_ => None,
};
let completion_error = match &e {
CoreExecutorError::TerminalFailure {
outcome,
cause_kind,
..
} => Some(meerkat_core::TurnErrorMetadata::terminal(
*cause_kind,
*outcome,
format!("apply failed: {error_msg}"),
)),
_ => None,
};
drop(d);
let fail_result = if cancelled {
crate::meerkat_machine::cancel_runtime_loop_run(driver, run_id).await
} else if let Some(failure) = terminal_failure {
crate::meerkat_machine::fail_machine_run(driver, run_id, failure).await
} else {
crate::meerkat_machine::fail_runtime_loop_run(
driver,
run_id,
e.apply_failure_cause(),
)
.await
};
if let Err(err) = fail_result {
tracing::error!(error = %err, "failed to record runtime terminal event");
let should_stop = stop_runtime_loop_executor_from_dsl_effect(
driver,
completions,
executor,
format!("runtime failure snapshot failed: {err}"),
)
.await;
if let Some(completions) = completions.as_ref() {
let mut completions = completions.lock().await;
abandon_completion_waiters(
&mut completions,
&input_ids,
format!("runtime failure snapshot failed: {err}"),
);
}
return should_stop;
}
if let Some(completions) = completions.as_ref() {
let mut completions = completions.lock().await;
if cancelled {
for input_id in &input_ids {
completions.resolve_cancelled(input_id);
}
} else {
let reason = format!("apply failed: {error_msg}");
if let Some(error) = completion_error {
abandon_completion_waiters_with_error(
&mut completions,
&input_ids,
reason,
error,
);
} else {
abandon_completion_waiters(
&mut completions,
&input_ids,
reason,
);
}
}
}
let mut d = driver.lock().await;
let should_continue = d.has_queued_input_outside(&input_ids);
if should_continue {
d.defer_queued_inputs_behind_backlog(&input_ids);
d.take_wake_requested();
}
drop(d);
if should_continue {
continue;
}
return false;
}
}
}
None => return false, }
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::input::*;
use chrono::Utc;
use meerkat_core::lifecycle::run_primitive::{
ConversationAppend, ConversationAppendRole, CoreRenderable, PeerResponseTerminalApplyIntent,
};
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use std::time::Duration;
use meerkat_core::ops_lifecycle::{
OperationKind, OperationResult, OperationSpec, OpsLifecycleRegistry,
};
use meerkat_core::types::SessionId;
const TEST_PEER_RESPONSE_ROUTE_ID: &str = "11111111-1111-4111-8111-111111111111";
const TEST_PEER_RESPONSE_REQUEST_ID: &str = "22222222-2222-4222-8222-222222222222";
const TEST_PEER_RESPONSE_REQUEST_ID_2: &str = "33333333-3333-4333-8333-333333333333";
fn background_spec(name: &str) -> OperationSpec {
OperationSpec {
id: meerkat_core::ops_lifecycle::OperationId::new(),
kind: OperationKind::BackgroundToolOp,
owner_session_id: SessionId::new(),
display_name: name.into(),
source_label: "runtime-loop-test".into(),
child_session_id: None,
expect_peer_channel: false,
}
}
fn op_result(id: &meerkat_core::ops_lifecycle::OperationId, content: &str) -> OperationResult {
OperationResult {
id: id.clone(),
content: content.into(),
is_error: false,
duration_ms: 42,
tokens_used: 7,
}
}
fn make_shared_ephemeral_driver(runtime_id: &str) -> crate::meerkat_machine::SharedDriver {
Arc::new(crate::tokio::sync::Mutex::new(
crate::meerkat_machine::DriverEntry::Ephemeral(
crate::driver::ephemeral::EphemeralRuntimeDriver::new(
crate::identifiers::LogicalRuntimeId::new(runtime_id),
),
),
))
}
fn stop_runtime_executor_effect(reason: &str) -> crate::effect::RuntimeEffect {
crate::effect::runtime_effect_for_test(
crate::meerkat_machine::dsl::RuntimeEffectKind::StopRuntimeExecutor,
reason,
)
}
#[tokio::test]
async fn runtime_loop_stop_effect_failure_is_fail_closed_from_helper() {
let driver = make_shared_ephemeral_driver("stop-helper-fail-closed");
let stop_calls = Arc::new(AtomicUsize::new(0));
let apply_calls = Arc::new(AtomicUsize::new(0));
let mut executor = crate::control_plane::test_support::StopFailingExecutor::new(
Arc::clone(&stop_calls),
Arc::clone(&apply_calls),
);
let should_stop = stop_runtime_loop_executor_from_dsl_effect(
&driver,
None,
&mut executor,
"snapshot failure should stop the runtime loop".to_string(),
)
.await;
assert!(
should_stop,
"stop-effect failures must fail closed by stopping the runtime loop"
);
assert_eq!(stop_calls.load(Ordering::SeqCst), 1);
assert_eq!(
apply_calls.load(Ordering::SeqCst),
0,
"stop-effect failure must not fall through to queued work"
);
}
#[tokio::test]
async fn runtime_loop_drain_effect_failure_is_fail_closed() {
let driver = make_shared_ephemeral_driver("drain-effect-fail-closed");
let stop_calls = Arc::new(AtomicUsize::new(0));
let apply_calls = Arc::new(AtomicUsize::new(0));
let mut executor = crate::control_plane::test_support::StopFailingExecutor::new(
Arc::clone(&stop_calls),
Arc::clone(&apply_calls),
);
let (effect_tx, mut effect_rx) = tokio::sync::mpsc::channel(1);
effect_tx
.send(stop_runtime_executor_effect("drain failure should stop"))
.await
.expect("test effect should enqueue");
let should_stop = process_queue(&driver, &mut executor, &mut effect_rx, None).await;
assert!(
should_stop,
"drained executor-effect failures must stop the runtime loop"
);
assert_eq!(stop_calls.load(Ordering::SeqCst), 1);
assert_eq!(
apply_calls.load(Ordering::SeqCst),
0,
"failed stop effect must not be followed by ordinary queue processing"
);
}
#[tokio::test]
async fn runtime_loop_direct_effect_failure_exits_loop_with_channels_open() {
let driver = make_shared_ephemeral_driver("direct-effect-fail-closed");
let stop_calls = Arc::new(AtomicUsize::new(0));
let apply_calls = Arc::new(AtomicUsize::new(0));
let executor = crate::control_plane::test_support::StopFailingExecutor::new(
Arc::clone(&stop_calls),
Arc::clone(&apply_calls),
);
let (wake_tx, wake_rx) = tokio::sync::mpsc::channel(1);
let (effect_tx, effect_rx) = tokio::sync::mpsc::channel(1);
let handle = spawn_runtime_loop_with_completions(
driver,
Box::new(executor),
wake_rx,
effect_rx,
None,
None,
None,
std::sync::Weak::<crate::meerkat_machine::MeerkatMachine>::new(),
SessionId::new(),
);
effect_tx
.send(stop_runtime_executor_effect(
"direct effect failure should stop",
))
.await
.expect("test effect should enqueue");
tokio::time::timeout(Duration::from_secs(1), handle)
.await
.expect("runtime loop must exit after executor-effect failure")
.expect("runtime loop task should not panic");
assert_eq!(stop_calls.load(Ordering::SeqCst), 1);
assert_eq!(apply_calls.load(Ordering::SeqCst), 0);
drop((wake_tx, effect_tx));
}
fn make_prompt(text: &str) -> Input {
Input::Prompt(PromptInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Operator,
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
text: text.into(),
blocks: None,
typed_turn_appends: Vec::new(),
turn_metadata: None,
})
}
#[test]
fn input_to_prompt_extracts_text() {
let input = make_prompt("hello world");
assert_eq!(input_to_prompt(&input), "hello world");
}
#[test]
fn input_to_prompt_peer() {
let input = Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: "p".into(),
display_identity: Some("Peer P".into()),
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: None,
body: "peer message".into(),
payload: None,
blocks: None,
handling_mode: None,
});
assert_eq!(input_to_prompt(&input), "peer message");
}
#[test]
fn input_to_prompt_peer_message_uses_body_when_projection_text_is_empty() {
let input = Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: "peer-1".into(),
display_identity: None,
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(crate::input::PeerConvention::Message),
body: "plain body payload".into(),
payload: None,
blocks: None,
handling_mode: None,
});
assert_eq!(input_to_prompt(&input), "plain body payload");
}
#[test]
fn input_to_prompt_peer_request_is_runtime_owned_and_ignores_bogus_body() {
let input = Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: "11111111-1111-4111-8111-111111111111".into(),
display_identity: None,
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(crate::input::PeerConvention::Request {
request_id: "req-123".into(),
intent: "checksum_token".into(),
}),
body: "stale helper-local comms prose".into(),
payload: Some(serde_json::json!({"subject": "alpha beta gamma"})),
blocks: None,
handling_mode: None,
});
let prompt = input_to_prompt(&input);
assert!(
prompt.starts_with("Peer request from peer_id 11111111-1111-4111-8111-111111111111")
);
assert!(prompt.contains("\"peer_id\":\"11111111-1111-4111-8111-111111111111\""));
assert!(prompt.contains("\"in_reply_to\":\"req-123\""));
assert!(prompt.contains("\"status\":\"completed\""));
assert!(!prompt.contains("to=\""));
assert!(prompt.contains("Do not use send_message for this reply."));
}
#[test]
fn machine_batch_projection_peer_response_terminal_is_runtime_owned_from_typed_payload() {
let input = Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: TEST_PEER_RESPONSE_ROUTE_ID.into(),
display_identity: Some("Analyst".into()),
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(crate::input::PeerConvention::ResponseTerminal {
request_id: "018f6f79-7a82-7c4e-a552-a3b86f9630f1".into(),
status: crate::input::ResponseTerminalStatus::Completed,
}),
body: "stale helper-local comms prose".into(),
payload: Some(serde_json::json!({
"request_intent": "checksum_token",
"request_subject": "alpha beta gamma",
"token": "birch seventeen"
})),
blocks: None,
handling_mode: None,
});
let projection = runtime_input_projection_for_machine_batch(&input);
let context = projection
.context_append
.expect("terminal peer response should project in machine batch");
let CoreRenderable::SystemNotice { blocks, .. } = context.content else {
panic!("expected typed terminal context");
};
assert!(matches!(
blocks.first(),
Some(meerkat_core::types::SystemNoticeBlock::Comms { peer, request_id, status, .. })
if peer.as_ref().and_then(|peer| peer.display_name.as_deref()) == Some("Analyst")
&& request_id.as_deref() == Some("018f6f79-7a82-7c4e-a552-a3b86f9630f1")
&& status.as_deref() == Some("completed")
));
}
#[test]
fn machine_batch_projection_peer_response_terminal_omits_payload_key_extraction() {
let input = Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: TEST_PEER_RESPONSE_ROUTE_ID.into(),
display_identity: Some("Analyst".into()),
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(crate::input::PeerConvention::ResponseTerminal {
request_id: "018f6f79-7a82-7c4e-a552-a3b86f9630f1".into(),
status: crate::input::ResponseTerminalStatus::Completed,
}),
body: "stale helper-local comms prose".into(),
payload: Some(serde_json::json!({
"request_intent": "checksum_token",
"request_subject": "alpha beta gamma",
"token": "birch seventeen"
})),
blocks: None,
handling_mode: None,
});
let projection = runtime_input_projection_for_machine_batch(&input);
let context = projection
.context_append
.expect("terminal peer response should project in machine batch");
let CoreRenderable::SystemNotice { blocks, .. } = context.content else {
panic!("expected typed terminal context");
};
let rendered = blocks
.first()
.and_then(|block| match block {
meerkat_core::types::SystemNoticeBlock::Comms { payload, .. } => {
payload.as_ref().map(ToString::to_string)
}
_ => None,
})
.unwrap_or_default();
assert!(
!rendered.contains("Authoritative result fields:"),
"runtime must not emit scenario-specific authoritative-field hints: {rendered}"
);
assert!(
!rendered.contains("the exact token answer is"),
"runtime must not coach the model about specific payload values: {rendered}"
);
assert!(
!rendered.contains("request_intent=checksum_token"),
"runtime must not re-flatten payload keys into prompt text: {rendered}"
);
}
#[test]
fn input_to_primitive_creates_staged() -> Result<(), String> {
let input = make_prompt("test prompt");
let input_id = input.id().clone();
let primitive = input_to_primitive(&input, input_id.clone())
.expect("single input metadata cannot conflict");
let staged = match primitive {
RunPrimitive::StagedInput(staged) => staged,
other => return Err(format!("expected staged input, got {other:?}")),
};
assert_eq!(staged.boundary, RunApplyBoundary::RunStart);
assert_eq!(staged.contributing_input_ids, vec![input_id]);
assert_eq!(staged.appends.len(), 1);
assert_eq!(staged.appends[0].role, ConversationAppendRole::User);
match &staged.appends[0].content {
CoreRenderable::Text { text } => assert_eq!(text, "test prompt"),
other => return Err(format!("expected text content, got {other:?}")),
}
Ok(())
}
#[test]
fn input_to_primitive_preserves_typed_prompt_appends_without_user_text() -> Result<(), String> {
let typed_append = ConversationAppend {
role: ConversationAppendRole::SystemNotice,
content: CoreRenderable::SystemNotice {
kind: meerkat_core::types::SystemNoticeKind::Comms,
body: Some("Peer message".to_string()),
blocks: vec![meerkat_core::types::SystemNoticeBlock::Comms {
kind: "message".to_string(),
direction: meerkat_core::types::SystemNoticeDirection::Incoming,
peer: None,
request_id: None,
intent: None,
status: None,
summary: Some("Peer message".to_string()),
payload: None,
content: Vec::new(),
}],
},
};
let mut input = make_prompt("");
let Input::Prompt(prompt) = &mut input else {
return Err("expected prompt".to_string());
};
prompt.typed_turn_appends = vec![typed_append.clone()];
let input_id = input.id().clone();
let primitive = input_to_primitive(&input, input_id.clone())
.expect("single input metadata cannot conflict");
let RunPrimitive::StagedInput(staged) = primitive else {
return Err("expected staged input".to_string());
};
assert_eq!(staged.contributing_input_ids, vec![input_id]);
assert_eq!(staged.appends, vec![typed_append]);
Ok(())
}
#[test]
fn staged_conversation_input_starts_conversation_turn_state() -> Result<(), String> {
let input = make_prompt("test prompt");
let input_id = input.id().clone();
let primitive =
input_to_primitive(&input, input_id).expect("single input metadata cannot conflict");
let run_id = RunId::new();
let start_input = primitive_turn_start_input(&run_id, &primitive)
.ok_or_else(|| "expected staged content input to start turn state".to_string())?;
match start_input {
crate::meerkat_machine::dsl::MeerkatMachineInput::StartConversationRun {
run_id: got_run_id,
primitive_kind,
admitted_content_shape,
..
} => {
assert_eq!(
got_run_id,
crate::meerkat_machine::dsl::RunId::from_domain(&run_id)
);
assert_eq!(
primitive_kind,
crate::meerkat_machine::dsl::TurnPrimitiveKind::ConversationTurn
);
assert_eq!(
admitted_content_shape,
crate::meerkat_machine::dsl::ContentShape::Conversation
);
Ok(())
}
other => Err(format!("expected StartConversationRun, got {other:?}")),
}
}
#[test]
fn peer_response_terminal_forced_immediate_boundary_is_invalid_apply_intent()
-> Result<(), String> {
let input = Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: TEST_PEER_RESPONSE_ROUTE_ID.into(),
display_identity: Some("Analyst".into()),
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(crate::input::PeerConvention::ResponseTerminal {
request_id: "018f6f79-7a82-7c4e-a552-a3b86f9630f1".into(),
status: crate::input::ResponseTerminalStatus::Completed,
}),
body: "stale helper-local comms prose".into(),
payload: Some(serde_json::json!({
"request_intent": "checksum_token",
"request_subject": "alpha beta gamma",
"token": "birch seventeen"
})),
blocks: None,
handling_mode: None,
});
let input_id = input.id().clone();
let primitive = inputs_to_primitive_with_boundary(
&[(input_id.clone(), input)],
RunApplyBoundary::Immediate,
)
.expect("single input metadata cannot conflict");
assert!(
primitive
.peer_response_terminal_apply_intent_violation()
.is_some(),
"terminal peer response with an immediate boundary must fail the typed apply invariant"
);
assert!(!primitive.is_context_only_apply_without_turn());
let staged = match primitive {
RunPrimitive::StagedInput(staged) => staged,
other => return Err(format!("expected staged input, got {other:?}")),
};
assert_eq!(staged.boundary, RunApplyBoundary::Immediate);
assert_eq!(staged.contributing_input_ids, vec![input_id]);
assert!(staged.appends.is_empty());
assert_eq!(staged.context_appends.len(), 1);
assert_eq!(
staged.context_appends[0].key,
format!(
"peer_response_terminal:{TEST_PEER_RESPONSE_ROUTE_ID}:018f6f79-7a82-7c4e-a552-a3b86f9630f1"
)
);
match &staged.context_appends[0].content {
CoreRenderable::SystemNotice { blocks, .. } => {
assert!(matches!(
blocks.first(),
Some(meerkat_core::types::SystemNoticeBlock::Comms { payload, .. })
if payload.as_ref().is_some_and(|payload| payload.to_string().contains("birch seventeen"))
));
}
other => return Err(format!("expected typed content, got {other:?}")),
}
Ok(())
}
#[test]
fn peer_response_terminal_input_boundary_matches_machine_boundary() -> Result<(), String> {
let input = Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: TEST_PEER_RESPONSE_ROUTE_ID.into(),
display_identity: Some("Analyst".into()),
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(crate::input::PeerConvention::ResponseTerminal {
request_id: "018f6f79-7a82-7c4e-a552-a3b86f9630f1".into(),
status: crate::input::ResponseTerminalStatus::Completed,
}),
body: "done".into(),
payload: Some(serde_json::json!({
"request_intent": "checksum_token",
"request_subject": "alpha beta gamma",
"token": "birch seventeen"
})),
blocks: None,
handling_mode: None,
});
let primitive = inputs_to_primitive(&[(input.id().clone(), input)])
.expect("single input metadata cannot conflict");
let staged = match primitive {
RunPrimitive::StagedInput(staged) => staged,
other => return Err(format!("expected staged input, got {other:?}")),
};
assert_eq!(staged.boundary, RunApplyBoundary::RunStart);
assert!(staged.appends.is_empty());
assert_eq!(staged.context_appends.len(), 1);
Ok(())
}
#[test]
fn queued_peer_response_terminal_starts_requester_reaction_turn() -> Result<(), String> {
let input = Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: TEST_PEER_RESPONSE_ROUTE_ID.into(),
display_identity: Some("analyst-rt".into()),
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(crate::input::PeerConvention::ResponseTerminal {
request_id: TEST_PEER_RESPONSE_REQUEST_ID.into(),
status: crate::input::ResponseTerminalStatus::Completed,
}),
body: String::new(),
payload: Some(serde_json::json!({
"request_intent": "checksum_token",
"request_subject": "alpha beta gamma",
"token": "birch seventeen"
})),
blocks: None,
handling_mode: None,
});
let primitive = inputs_to_primitive(&[(input.id().clone(), input)])
.expect("single input metadata cannot conflict");
let run_id = RunId::new();
let start_input = primitive_turn_start_input(&run_id, &primitive).ok_or_else(|| {
"terminal response should start a requester reaction turn".to_string()
})?;
match start_input {
crate::meerkat_machine::dsl::MeerkatMachineInput::StartConversationRun {
run_id: got_run_id,
primitive_kind,
admitted_content_shape,
..
} => {
assert_eq!(
got_run_id,
crate::meerkat_machine::dsl::RunId::from_domain(&run_id)
);
assert_eq!(
primitive_kind,
crate::meerkat_machine::dsl::TurnPrimitiveKind::ConversationTurn
);
assert_eq!(
admitted_content_shape,
crate::meerkat_machine::dsl::ContentShape::Context
);
Ok(())
}
other => Err(format!("expected StartConversationRun, got {other:?}")),
}
}
#[test]
fn peer_response_terminal_apply_intent_is_policy_runtime_and_executor_consistent()
-> Result<(), String> {
let input = Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: TEST_PEER_RESPONSE_ROUTE_ID.into(),
display_identity: Some("analyst-rt".into()),
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(crate::input::PeerConvention::ResponseTerminal {
request_id: TEST_PEER_RESPONSE_REQUEST_ID.into(),
status: crate::input::ResponseTerminalStatus::Completed,
}),
body: String::new(),
payload: Some(serde_json::json!({
"request_intent": "checksum_token",
"request_subject": "alpha beta gamma",
"token": "birch seventeen"
})),
blocks: None,
handling_mode: None,
});
let policy = crate::policy_table::DefaultPolicyTable::resolve(&input, true);
assert_eq!(policy.apply_mode, crate::ApplyMode::StageRunStart);
assert_eq!(policy.wake_mode, crate::WakeMode::WakeIfIdle);
assert_eq!(policy.queue_mode, crate::QueueMode::Fifo);
let semantics = crate::ingress_types::RuntimeInputSemantics::from_policy_and_kind(
&policy,
input.kind(),
);
assert_eq!(semantics.boundary, RunApplyBoundary::RunStart);
assert_eq!(
semantics.execution_kind,
meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn
);
assert_eq!(
semantics.peer_response_terminal_apply_intent,
Some(PeerResponseTerminalApplyIntent::AppendContextAndRun)
);
let primitive = try_inputs_to_primitive_with_boundary(
&[(input.id().clone(), input)],
semantics.boundary,
&[semantics],
)
.expect("single input metadata cannot conflict");
let metadata = primitive
.turn_metadata()
.ok_or_else(|| "terminal primitive should carry metadata".to_string())?;
assert_eq!(
metadata.peer_response_terminal_apply_intent,
Some(PeerResponseTerminalApplyIntent::AppendContextAndRun)
);
assert!(primitive.is_peer_response_terminal_context_and_run());
assert_eq!(
primitive.peer_response_terminal_apply_intent_violation(),
None
);
assert!(
!primitive.is_context_only_apply_without_turn(),
"executor context-only shortcut must not swallow terminal peer responses"
);
Ok(())
}
fn make_peer_message(peer_id: &str, body: &str) -> Input {
Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: peer_id.into(),
display_identity: Some("analyst-rt".into()),
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(PeerConvention::Message),
body: body.into(),
payload: None,
blocks: None,
handling_mode: None,
})
}
fn make_terminal_peer_response(peer_id: &str, request_id: &str) -> Input {
Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: peer_id.into(),
display_identity: Some("analyst-rt".into()),
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(PeerConvention::ResponseTerminal {
request_id: request_id.into(),
status: ResponseTerminalStatus::Completed,
}),
body: String::new(),
payload: Some(serde_json::json!({"ok": true})),
blocks: None,
handling_mode: None,
})
}
async fn accept_queued_input_id(
driver: &crate::meerkat_machine::SharedDriver,
input: Input,
) -> InputId {
let mut guard = driver.lock().await;
match guard
.as_driver_mut()
.accept_input(input)
.await
.expect("input should be accepted")
{
crate::accept::AcceptOutcome::Accepted { input_id, .. } => input_id,
other => panic!("expected accepted queued input, got {other:?}"),
}
}
#[tokio::test]
async fn terminal_peer_response_batches_do_not_mix_with_peer_messages() {
let terminal_first = make_shared_ephemeral_driver("terminal-first");
let terminal_id = accept_queued_input_id(
&terminal_first,
make_terminal_peer_response(TEST_PEER_RESPONSE_ROUTE_ID, TEST_PEER_RESPONSE_REQUEST_ID),
)
.await;
let _message_id = accept_queued_input_id(
&terminal_first,
make_peer_message("analyst-rt", "follow-up"),
)
.await;
{
let guard = terminal_first.lock().await;
let batch = crate::meerkat_machine::machine_select_runtime_loop_batch(&guard);
assert_eq!(
batch,
vec![terminal_id],
"terminal response batch must not absorb later normal peer messages"
);
assert_eq!(
crate::meerkat_machine::machine_batch_runtime_semantics(&guard, &batch).and_then(
|semantics| {
semantics
.into_iter()
.find_map(|semantics| semantics.peer_response_terminal_apply_intent)
}
),
Some(PeerResponseTerminalApplyIntent::AppendContextAndRun)
);
}
let message_first = make_shared_ephemeral_driver("message-first");
let message_id =
accept_queued_input_id(&message_first, make_peer_message("analyst-rt", "first")).await;
let _terminal_id = accept_queued_input_id(
&message_first,
make_terminal_peer_response(
TEST_PEER_RESPONSE_ROUTE_ID,
TEST_PEER_RESPONSE_REQUEST_ID_2,
),
)
.await;
let guard = message_first.lock().await;
let batch = crate::meerkat_machine::machine_select_runtime_loop_batch(&guard);
assert_eq!(
batch,
vec![message_id],
"normal peer message batch must not absorb later terminal responses"
);
assert_eq!(
crate::meerkat_machine::machine_batch_runtime_semantics(&guard, &batch).and_then(
|semantics| {
semantics
.into_iter()
.find_map(|semantics| semantics.peer_response_terminal_apply_intent)
}
),
None
);
}
#[test]
fn peer_input_with_blocks_produces_blocks_renderable() -> Result<(), String> {
let blocks = vec![
meerkat_core::types::ContentBlock::Text {
text: "see this image".into(),
},
meerkat_core::types::ContentBlock::Image {
media_type: "image/png".into(),
data: "abc123".into(),
},
];
let input = Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: "p".into(),
display_identity: None,
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(crate::input::PeerConvention::Message),
body: "see this image".into(),
payload: None,
blocks: Some(blocks.clone()),
handling_mode: None,
});
let input_id = input.id().clone();
let primitive =
input_to_primitive(&input, input_id).expect("single input metadata cannot conflict");
let staged = match primitive {
RunPrimitive::StagedInput(staged) => staged,
other => return Err(format!("expected staged input, got {other:?}")),
};
assert_eq!(staged.appends.len(), 1);
match &staged.appends[0].content {
CoreRenderable::SystemNotice { blocks: got, .. } => {
let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
got.first()
else {
return Err("expected comms block".into());
};
assert_eq!(peer.as_ref().map(|peer| peer.id.as_str()), Some("p"));
assert_eq!(content, &blocks);
}
other => return Err(format!("expected typed content, got {other:?}")),
}
Ok(())
}
#[test]
fn peer_multimodal_append_keeps_source_without_duplicating_block_text() -> Result<(), String> {
let blocks = vec![
meerkat_core::types::ContentBlock::Text {
text: "caption text".into(),
},
meerkat_core::types::ContentBlock::Image {
media_type: "image/png".into(),
data: "abc123".into(),
},
];
let input = Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: "peer-1".into(),
display_identity: None,
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(crate::input::PeerConvention::Message),
body: "caption text".into(),
payload: None,
blocks: Some(blocks.clone()),
handling_mode: None,
});
let staged = match input_to_primitive(&input, input.id().clone())
.expect("single input metadata cannot conflict")
{
RunPrimitive::StagedInput(staged) => staged,
other => return Err(format!("expected staged input, got {other:?}")),
};
match &staged.appends[0].content {
CoreRenderable::SystemNotice { blocks: got, .. } => {
let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
got.first()
else {
return Err("expected comms block".into());
};
assert_eq!(peer.as_ref().map(|peer| peer.id.as_str()), Some("peer-1"));
assert_eq!(content, &blocks);
}
other => return Err(format!("expected typed content, got {other:?}")),
}
Ok(())
}
#[test]
fn peer_image_only_blocks_keep_source_identity_without_text_duplication() -> Result<(), String>
{
let blocks = vec![meerkat_core::types::ContentBlock::Image {
media_type: "image/png".into(),
data: "abc123".into(),
}];
let input = Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: "peer-1".into(),
display_identity: None,
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(crate::input::PeerConvention::Message),
body: String::new(),
payload: None,
blocks: Some(blocks.clone()),
handling_mode: None,
});
let staged = match input_to_primitive(&input, input.id().clone())
.expect("single input metadata cannot conflict")
{
RunPrimitive::StagedInput(staged) => staged,
other => return Err(format!("expected staged input, got {other:?}")),
};
match &staged.appends[0].content {
CoreRenderable::SystemNotice { blocks: got, .. } => {
let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
got.first()
else {
return Err("expected comms block".into());
};
assert_eq!(peer.as_ref().map(|peer| peer.id.as_str()), Some("peer-1"));
assert_eq!(content, &blocks);
}
other => return Err(format!("expected typed content, got {other:?}")),
}
Ok(())
}
#[test]
fn peer_image_only_blocks_preserve_rendered_body_text() -> Result<(), String> {
let blocks = vec![meerkat_core::types::ContentBlock::Image {
media_type: "image/png".into(),
data: "abc123".into(),
}];
let input = Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: "peer-1".into(),
display_identity: None,
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(crate::input::PeerConvention::Message),
body: "Please describe the attached image.".into(),
payload: None,
blocks: Some(blocks.clone()),
handling_mode: None,
});
let staged = match input_to_primitive(&input, input.id().clone())
.expect("single input metadata cannot conflict")
{
RunPrimitive::StagedInput(staged) => staged,
other => return Err(format!("expected staged input, got {other:?}")),
};
match &staged.appends[0].content {
CoreRenderable::SystemNotice { blocks: got, .. } => {
let Some(meerkat_core::types::SystemNoticeBlock::Comms { content, peer, .. }) =
got.first()
else {
return Err("expected comms block".into());
};
assert_eq!(peer.as_ref().map(|peer| peer.id.as_str()), Some("peer-1"));
assert_eq!(
content.first(),
Some(&meerkat_core::types::ContentBlock::Text {
text: "Please describe the attached image.".into(),
})
);
assert_eq!(content.get(1), Some(&blocks[0]));
}
other => return Err(format!("expected blocks content, got {other:?}")),
}
Ok(())
}
#[test]
fn flow_step_with_blocks_produces_blocks_renderable() -> Result<(), String> {
let blocks = vec![
meerkat_core::types::ContentBlock::Text {
text: "analyze this screenshot".into(),
},
meerkat_core::types::ContentBlock::Image {
media_type: "image/png".into(),
data: "abc123".into(),
},
];
let input = Input::FlowStep(FlowStepInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Flow {
flow_id: "flow-1".into(),
step_index: 0,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
step_id: "step-1".into(),
instructions: "analyze this screenshot".into(),
blocks: Some(blocks),
turn_metadata: None,
});
let input_id = input.id().clone();
let primitive =
input_to_primitive(&input, input_id).expect("single input metadata cannot conflict");
let staged = match primitive {
RunPrimitive::StagedInput(staged) => staged,
other => return Err(format!("expected staged input, got {other:?}")),
};
assert_eq!(staged.appends.len(), 1);
match &staged.appends[0].content {
CoreRenderable::SystemNotice { blocks: got, .. } => {
assert!(matches!(
got.first(),
Some(meerkat_core::types::SystemNoticeBlock::RuntimeNotice { category, detail, .. })
if category == "flow_step" && detail.as_deref() == Some("analyze this screenshot")
));
}
other => return Err(format!("expected typed content, got {other:?}")),
}
Ok(())
}
#[test]
fn external_event_with_blocks_produces_blocks_renderable() -> Result<(), String> {
let blocks = vec![
meerkat_core::types::ContentBlock::Text {
text: "see this event".into(),
},
meerkat_core::types::ContentBlock::Image {
media_type: "image/png".into(),
data: "abc123".into(),
},
];
let input = Input::ExternalEvent(crate::input::ExternalEventInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::External {
source_name: "webhook".into(),
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
event_type: "webhook".into(),
payload: serde_json::json!({"body": "see this event"}),
blocks: Some(blocks.clone()),
handling_mode: meerkat_core::types::HandlingMode::Queue,
render_metadata: None,
});
let input_id = input.id().clone();
let primitive =
input_to_primitive(&input, input_id).expect("single input metadata cannot conflict");
let staged = match primitive {
RunPrimitive::StagedInput(staged) => staged,
other => return Err(format!("expected staged input, got {other:?}")),
};
assert_eq!(staged.appends.len(), 1);
match &staged.appends[0].content {
CoreRenderable::SystemNotice { blocks: got, .. } => {
let Some(meerkat_core::types::SystemNoticeBlock::ExternalEvent { content, .. }) =
got.first()
else {
return Err("expected external event block".into());
};
assert_eq!(content, &blocks);
}
other => return Err(format!("expected typed content, got {other:?}")),
}
Ok(())
}
#[test]
fn external_event_with_image_only_blocks_keeps_event_identity() -> Result<(), String> {
let blocks = vec![meerkat_core::types::ContentBlock::Image {
media_type: "image/png".into(),
data: "abc123".into(),
}];
let input = Input::ExternalEvent(crate::input::ExternalEventInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::External {
source_name: "webhook".into(),
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
event_type: "webhook".into(),
payload: serde_json::json!({"body": "see attached screenshot"}),
blocks: Some(blocks.clone()),
handling_mode: meerkat_core::types::HandlingMode::Queue,
render_metadata: None,
});
let primitive = input_to_primitive(&input, input.id().clone())
.expect("single input metadata cannot conflict");
let staged = match primitive {
RunPrimitive::StagedInput(staged) => staged,
other => return Err(format!("expected staged input, got {other:?}")),
};
match &staged.appends[0].content {
CoreRenderable::SystemNotice { blocks: got, .. } => {
let Some(meerkat_core::types::SystemNoticeBlock::ExternalEvent { content, .. }) =
got.first()
else {
return Err("expected external event block".into());
};
assert_eq!(content, &blocks);
}
other => return Err(format!("expected typed content, got {other:?}")),
}
Ok(())
}
#[test]
fn external_event_prefers_source_name_over_event_type_without_body() -> Result<(), String> {
let input = Input::ExternalEvent(crate::input::ExternalEventInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::External {
source_name: "webhook".into(),
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
event_type: "invoice.created".into(),
payload: serde_json::json!({"invoice_id": "inv_123"}),
blocks: None,
handling_mode: meerkat_core::types::HandlingMode::Queue,
render_metadata: None,
});
assert_eq!(input_to_prompt(&input), "External event via webhook");
Ok(())
}
#[test]
fn plain_event_and_direct_runtime_external_event_share_projection() -> Result<(), String> {
use crate::comms_bridge::peer_input_candidate_to_runtime_input;
use crate::identifiers::LogicalRuntimeId;
use meerkat_core::interaction::{
InboxInteraction, InteractionContent, PeerInputCandidate, PeerInputClass,
};
let interaction_id = meerkat_core::interaction::InteractionId(uuid::Uuid::new_v4());
let from_comms = peer_input_candidate_to_runtime_input(
&PeerInputCandidate {
lifecycle_peer: None,
response_terminality: None,
ingress: meerkat_core::PeerIngressFact::plain_event(
interaction_id,
"webhook",
PeerInputClass::PlainEvent,
meerkat_core::PeerIngressKind::PlainEvent,
),
interaction: InboxInteraction {
id: interaction_id,
from_route: None,
from: "event:webhook".into(),
content: InteractionContent::Message {
body: "build failed".into(),
blocks: None,
},
rendered_text: "External event via webhook: build failed".into(),
handling_mode: meerkat_core::types::HandlingMode::Queue,
render_metadata: None,
},
},
&LogicalRuntimeId::new("test"),
)
.map_err(|err| err.to_string())?;
let direct = Input::ExternalEvent(crate::input::ExternalEventInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::External {
source_name: "webhook".into(),
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
event_type: "webhook".into(),
payload: serde_json::json!({"body": "build failed"}),
blocks: None,
handling_mode: meerkat_core::types::HandlingMode::Queue,
render_metadata: None,
});
assert_eq!(input_to_prompt(&from_comms), input_to_prompt(&direct));
assert_eq!(
input_to_prompt(&direct),
"External event via webhook: build failed"
);
Ok(())
}
#[test]
fn external_event_with_steer_preserves_runtime_hints() -> Result<(), String> {
let render_metadata = meerkat_core::types::RenderMetadata {
class: meerkat_core::types::RenderClass::ExternalEvent,
salience: meerkat_core::types::RenderSalience::Urgent,
};
let input = Input::ExternalEvent(crate::input::ExternalEventInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::External {
source_name: "webhook".into(),
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
event_type: "webhook".into(),
payload: serde_json::json!({"body": "urgent"}),
blocks: None,
handling_mode: meerkat_core::types::HandlingMode::Steer,
render_metadata: Some(render_metadata.clone()),
});
let staged = match input_to_primitive(&input, input.id().clone())
.expect("single input metadata cannot conflict")
{
RunPrimitive::StagedInput(staged) => staged,
other => return Err(format!("expected staged input, got {other:?}")),
};
assert_eq!(staged.boundary, RunApplyBoundary::RunCheckpoint);
assert_eq!(
staged
.turn_metadata
.as_ref()
.and_then(|meta| meta.handling_mode),
Some(meerkat_core::types::HandlingMode::Steer)
);
assert_eq!(
staged
.turn_metadata
.as_ref()
.and_then(|meta| meta.render_metadata.clone()),
Some(render_metadata)
);
Ok(())
}
#[tokio::test]
async fn resolve_completion_waiters_surfaces_callback_pending() {
let mut registry = crate::completion::CompletionRegistry::new();
let input_id = InputId::new();
let handle = registry.register(input_id.clone());
resolve_completion_waiters(
&mut registry,
std::slice::from_ref(&input_id),
Some(CoreApplyTerminal::CallbackPending {
tool_name: "external_mock".to_string(),
args: serde_json::json!({ "value": "browser" }),
}),
);
match handle.wait().await {
crate::completion::CompletionOutcome::CallbackPending { tool_name, args } => {
assert_eq!(tool_name, "external_mock");
assert_eq!(args, serde_json::json!({ "value": "browser" }));
}
other => panic!("Expected CallbackPending, got {other:?}"),
}
}
#[tokio::test]
async fn resolve_completion_waiters_surfaces_terminal_run_result() {
let mut registry = crate::completion::CompletionRegistry::new();
let input_id = InputId::new();
let handle = registry.register(input_id.clone());
let run_result = meerkat_core::types::RunResult {
text: "terminal authority".to_string(),
session_id: SessionId::new(),
usage: meerkat_core::types::Usage::default(),
turns: 1,
tool_calls: 0,
terminal_cause_kind: None,
structured_output: None,
extraction_error: None,
schema_warnings: None,
skill_diagnostics: None,
};
resolve_completion_waiters(
&mut registry,
std::slice::from_ref(&input_id),
Some(CoreApplyTerminal::RunResult(Box::new(run_result))),
);
match handle.wait().await {
crate::completion::CompletionOutcome::Completed(result) => {
assert_eq!(result.text, "terminal authority");
}
other => panic!("Expected Completed, got {other:?}"),
}
}
#[tokio::test]
async fn abandon_completion_waiters_surfaces_abandoned() {
let mut registry = crate::completion::CompletionRegistry::new();
let input_id = InputId::new();
let handle = registry.register(input_id.clone());
abandon_completion_waiters(
&mut registry,
std::slice::from_ref(&input_id),
"runtime loop failed before executor apply",
);
match handle.wait().await {
crate::completion::CompletionOutcome::Abandoned(reason) => {
assert_eq!(reason, "runtime loop failed before executor apply");
}
other => panic!("Expected Abandoned, got {other:?}"),
}
}
#[tokio::test]
async fn maybe_inject_feed_wake_feed_path_injects_inline_continuation_when_quiescent() {
let driver = make_shared_ephemeral_driver("feed-inline");
let registry = crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new();
let spec = background_spec("feed-inline");
let op_id = spec.id.clone();
registry.register_operation(spec).unwrap();
registry.provisioning_succeeded(&op_id).unwrap();
registry
.complete_operation(&op_id, op_result(&op_id, "done"))
.unwrap();
let feed = registry.completion_feed_handle();
let mut observed_seq = 0;
let mut last_injected_seq = 0;
let injected = maybe_inject_feed_wake(
&driver,
Some(feed.as_ref()),
&mut observed_seq,
&mut last_injected_seq,
None,
)
.await;
assert!(
injected,
"feed-backed path should inject inline when quiescent"
);
assert_eq!(observed_seq, feed.watermark());
assert_eq!(last_injected_seq, feed.watermark());
let mut guard = driver.lock().await;
assert_eq!(guard.as_driver().active_input_ids().len(), 1);
let (_input_id, input) = guard
.dequeue_next()
.expect("continuation should be queued inline");
match input {
Input::Continuation(continuation) => {
assert_eq!(continuation.reason, "detached_background_op_completed");
assert_eq!(
continuation.handling_mode,
meerkat_core::types::HandlingMode::Steer
);
}
other => panic!("expected inline continuation injection, got {other:?}"),
}
}
#[tokio::test]
async fn maybe_inject_feed_wake_without_feed_is_noop() {
let driver = make_shared_ephemeral_driver("no-feed");
let mut observed_seq = 0;
let mut last_injected_seq = 0;
let injected = maybe_inject_feed_wake(
&driver,
None,
&mut observed_seq,
&mut last_injected_seq,
None,
)
.await;
assert!(!injected, "no feed means no injection");
assert_eq!(observed_seq, 0);
assert_eq!(last_injected_seq, 0);
let guard = driver.lock().await;
assert!(
guard.as_driver().active_input_ids().is_empty(),
"no feed must not enqueue anything"
);
}
#[tokio::test]
async fn maybe_inject_feed_wake_no_bg_completions_advances_observed_seq() {
let driver = make_shared_ephemeral_driver("advance-only");
let registry = crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new();
let feed = registry.completion_feed_handle();
let mut observed_seq = 0;
let mut last_injected_seq = 0;
let injected = maybe_inject_feed_wake(
&driver,
Some(feed.as_ref()),
&mut observed_seq,
&mut last_injected_seq,
None,
)
.await;
assert!(!injected, "empty feed should not inject");
assert_eq!(observed_seq, feed.watermark());
assert_eq!(last_injected_seq, 0);
let guard = driver.lock().await;
assert!(guard.as_driver().active_input_ids().is_empty());
}
#[test]
fn primitive_from_prompt_has_content_turn() {
let input = make_prompt("hello");
let id = input.id().clone();
let primitive =
input_to_primitive(&input, id).expect("single input metadata cannot conflict");
let meta = primitive
.turn_metadata()
.expect("should have turn_metadata");
assert_eq!(
meta.execution_kind,
Some(meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn)
);
}
#[test]
fn primitive_from_peer_input_preserves_explicit_handling_mode() {
let mut input = make_peer_message("peer-steer", "urgent helper update");
let Input::Peer(peer) = &mut input else {
panic!("make_peer_message must build a peer input");
};
peer.handling_mode = Some(meerkat_core::types::HandlingMode::Steer);
let primitive = input_to_primitive(&input, input.id().clone())
.expect("single peer input metadata cannot conflict");
let meta = primitive
.turn_metadata()
.expect("peer primitive should carry turn metadata");
assert_eq!(
meta.handling_mode,
Some(meerkat_core::types::HandlingMode::Steer),
"peer handling_mode must survive primitive construction for live/steer projection"
);
}
#[test]
fn primitive_from_continuation_has_resume_pending() {
let input = Input::Continuation(ContinuationInput::detached_background_op_completed());
let id = input.id().clone();
let primitive =
input_to_primitive(&input, id).expect("single input metadata cannot conflict");
let meta = primitive
.turn_metadata()
.expect("should have turn_metadata");
assert_eq!(
meta.execution_kind,
Some(meerkat_core::lifecycle::RuntimeExecutionKind::ResumePending)
);
}
#[test]
fn admitted_input_primitive_uses_runtime_stamped_execution_kind() {
let input = make_prompt("test prompt");
let id = input.id().clone();
let primitive = admitted_input_to_primitive(
&input,
id,
crate::input::runtime_input_projection(&input),
crate::ingress_types::RuntimeInputSemantics {
boundary: RunApplyBoundary::RunStart,
execution_kind: meerkat_core::lifecycle::RuntimeExecutionKind::ResumePending,
peer_response_terminal_apply_intent: None,
},
)
.expect("single input metadata cannot conflict");
let meta = primitive
.turn_metadata()
.expect("should have turn_metadata");
assert_eq!(
meta.execution_kind,
Some(meerkat_core::lifecycle::RuntimeExecutionKind::ResumePending),
"primitive construction must use the runtime-stamped execution kind, not the local input kind"
);
}
#[test]
fn primitive_from_peer_terminal_has_content_turn() {
let input = Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: "p".into(),
display_identity: None,
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(PeerConvention::ResponseTerminal {
request_id: "018f6f79-7a82-7c4e-a552-a3b86f9630f1".into(),
status: ResponseTerminalStatus::Completed,
}),
body: "done".into(),
payload: None,
blocks: None,
handling_mode: None,
});
let id = input.id().clone();
let primitive =
input_to_primitive(&input, id).expect("single input metadata cannot conflict");
let meta = primitive
.turn_metadata()
.expect("should have turn_metadata");
assert_eq!(
meta.execution_kind,
Some(meerkat_core::lifecycle::RuntimeExecutionKind::ContentTurn)
);
}
#[test]
fn mixed_batch_execution_kind_conflict_is_rejected() {
let peer = Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: "p".into(),
display_identity: None,
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(PeerConvention::Message),
body: "msg".into(),
payload: None,
blocks: None,
handling_mode: None,
});
let continuation =
Input::Continuation(ContinuationInput::detached_background_op_completed());
let inputs = vec![
(peer.id().clone(), peer),
(continuation.id().clone(), continuation),
];
let err = inputs_to_primitive_with_boundary(&inputs, RunApplyBoundary::RunCheckpoint)
.expect_err("mixed execution kinds should be rejected");
assert_eq!(err.field, "execution_kind");
}
#[test]
fn batch_metadata_conflict_surfaces_typed_error() {
let mut first = make_prompt("first");
if let Input::Prompt(prompt) = &mut first {
prompt.turn_metadata = Some(
meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata {
model: Some(meerkat_core::lifecycle::run_primitive::ModelId::new(
"model-a",
)),
..Default::default()
},
);
}
let mut second = make_prompt("second");
if let Input::Prompt(prompt) = &mut second {
prompt.turn_metadata = Some(
meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata {
model: Some(meerkat_core::lifecycle::run_primitive::ModelId::new(
"model-b",
)),
..Default::default()
},
);
}
let inputs = vec![(first.id().clone(), first), (second.id().clone(), second)];
let semantics = fallback_batch_semantics(&inputs);
let err =
try_inputs_to_primitive_with_boundary(&inputs, RunApplyBoundary::RunStart, &semantics)
.expect_err("conflicting batch metadata should be rejected");
assert_eq!(err.field, "model");
}
}