use std::sync::Arc;
use tokio::sync::{Mutex as AsyncMutex, RwLock};
use crate::mcp::McpPool;
use crate::tools::ToolRegistry;
use zagens_core::capacity::{CapacityDecision, CapacitySnapshot, GuardrailAction};
use zagens_core::engine::turn_loop::continuation_boundary_policy::OuterBoundaryKind;
use zagens_core::engine::turn_loop::live_turn_outer_planner::{
CapacityCheckpointEffectTail, plan_capacity_checkpoint_effect_tail,
verify_capacity_tail_alignment,
};
use zagens_core::engine::turn_machine::{Effect, capacity_cooldown_backoff_millis};
use zagens_core::turn::{TurnContext, TurnLoopMode};
use super::super::compaction_ops::RunCompactionScope;
use super::super::effect_interpreter::EffectInterpreter;
use super::super::*;
pub(in crate::core::engine) struct CapacityDispatchContext<'a> {
pub turn: &'a TurnContext,
pub mode: TurnLoopMode,
pub snapshot: Option<&'a CapacitySnapshot>,
pub decision: &'a CapacityDecision,
pub client: Option<&'a dyn crate::llm_client::LlmClient>,
pub tool_registry: Option<&'a ToolRegistry>,
pub tool_exec_lock: Option<Arc<RwLock<()>>>,
pub mcp_pool: Option<Arc<AsyncMutex<McpPool>>>,
pub handoff_reason: &'a str,
pub hold_boundary: Option<OuterBoundaryKind>,
}
impl Engine {
fn verify_v3_capacity_tail_if_enabled(
&self,
action: GuardrailAction,
cooldown_blocked: bool,
interpreted: CapacityCheckpointEffectTail,
) {
if !self.runtime_ext().kernel_machine_mode.uses_v3_turn_loop() {
return;
}
let planned = plan_capacity_checkpoint_effect_tail(action, cooldown_blocked);
if let Some(summary) = verify_capacity_tail_alignment(planned, interpreted) {
tracing::warn!(
target: "kernel_v3",
%summary,
?action,
cooldown_blocked,
"capacity tail replay diff"
);
}
}
pub(in crate::core::engine) async fn dispatch_capacity_decision(
&mut self,
ctx: CapacityDispatchContext<'_>,
) -> bool {
if ctx.decision.cooldown_blocked {
let interpreted = CapacityCheckpointEffectTail::Sleep {
millis: capacity_cooldown_backoff_millis(),
};
self.verify_v3_capacity_tail_if_enabled(ctx.decision.action, true, interpreted);
return false;
}
let action = ctx.decision.action;
let result = match action {
GuardrailAction::TargetedContextRefresh => {
self.route_capacity_trim_refresh(ctx.turn, ctx.client, ctx.mode, ctx.snapshot)
.await
}
GuardrailAction::VerifyAndReplan => {
self.route_capacity_handoff_replan(
ctx.turn,
ctx.mode,
ctx.snapshot,
ctx.handoff_reason,
)
.await
}
GuardrailAction::VerifyWithToolReplay => {
let Some(registry) = ctx.tool_registry else {
let interpreted = CapacityCheckpointEffectTail::None;
self.verify_v3_capacity_tail_if_enabled(action, false, interpreted);
self.log_capacity_hold_planner_if_enabled(&ctx, action, interpreted, false);
return false;
};
let Some(lock) = ctx.tool_exec_lock.clone() else {
let interpreted = CapacityCheckpointEffectTail::None;
self.verify_v3_capacity_tail_if_enabled(action, false, interpreted);
self.log_capacity_hold_planner_if_enabled(&ctx, action, interpreted, false);
return false;
};
let replay_ok = self
.route_capacity_tool_replay(
ctx.turn,
ctx.mode,
ctx.snapshot,
Some(registry),
lock,
ctx.mcp_pool.clone(),
)
.await;
let interpreted = CapacityCheckpointEffectTail::None;
self.verify_v3_capacity_tail_if_enabled(action, false, interpreted);
self.log_capacity_hold_planner_if_enabled(&ctx, action, interpreted, replay_ok);
return replay_ok;
}
GuardrailAction::NoIntervention => false,
};
let interpreted = if matches!(
action,
GuardrailAction::TargetedContextRefresh | GuardrailAction::VerifyAndReplan
) {
CapacityCheckpointEffectTail::RunCompaction
} else {
CapacityCheckpointEffectTail::None
};
self.verify_v3_capacity_tail_if_enabled(action, false, interpreted);
self.log_capacity_hold_planner_if_enabled(&ctx, action, interpreted, result);
result
}
fn log_capacity_hold_planner_if_enabled(
&self,
ctx: &CapacityDispatchContext<'_>,
action: GuardrailAction,
interpreted: CapacityCheckpointEffectTail,
held: bool,
) {
let Some(boundary) = ctx.hold_boundary else {
return;
};
self.log_v3_capacity_hold_planner_effect(
boundary,
&ctx.turn.id,
ctx.turn.step,
action,
ctx.decision.cooldown_blocked,
interpreted,
held,
);
}
pub(in crate::core::engine) async fn route_capacity_trim_refresh(
&mut self,
turn: &TurnContext,
client: Option<&dyn crate::llm_client::LlmClient>,
mode: TurnLoopMode,
snapshot: Option<&CapacitySnapshot>,
) -> bool {
if !self.runtime_ext().kernel_machine_mode.uses_v3_turn_loop() {
return self
.apply_targeted_context_refresh(turn, client, mode, snapshot)
.await;
}
tracing::info!(
target: "kernel_v3",
turn_id = %turn.id,
step = turn.step,
"v3 capacity: RunCompaction trim (effect plan)"
);
let ext = self.runtime_ext_mut();
ext.kernel_run_compaction_scope = Some(RunCompactionScope::CapacityTrim);
ext.kernel_capacity_snapshot = snapshot.cloned();
ext.kernel_capacity_turn_mode = Some(mode);
ext.kernel_capacity_handoff_reason = None;
ext.kernel_capacity_intervention_ok = None;
let mut interpreter = EffectInterpreter::new(self);
let _ = interpreter.interpret(Effect::RunCompaction).await;
self.runtime_ext_mut()
.kernel_capacity_intervention_ok
.take()
.unwrap_or(false)
}
pub(in crate::core::engine) async fn route_capacity_handoff_replan(
&mut self,
turn: &TurnContext,
mode: TurnLoopMode,
snapshot: Option<&CapacitySnapshot>,
reason: &str,
) -> bool {
if !self.runtime_ext().kernel_machine_mode.uses_v3_turn_loop() {
return self
.apply_verify_and_replan(turn, mode, snapshot, reason)
.await;
}
tracing::info!(
target: "kernel_v3",
turn_id = %turn.id,
step = turn.step,
reason,
"v3 capacity: RunCompaction handoff (effect plan)"
);
let ext = self.runtime_ext_mut();
ext.kernel_run_compaction_scope = Some(RunCompactionScope::CapacityHandoff);
ext.kernel_capacity_snapshot = snapshot.cloned();
ext.kernel_capacity_turn_mode = Some(mode);
ext.kernel_capacity_handoff_reason = Some(reason.to_string());
ext.kernel_capacity_intervention_ok = None;
let mut interpreter = EffectInterpreter::new(self);
let _ = interpreter.interpret(Effect::RunCompaction).await;
self.runtime_ext_mut()
.kernel_capacity_intervention_ok
.take()
.unwrap_or(false)
}
pub(in crate::core::engine) async fn route_capacity_tool_replay(
&mut self,
turn: &TurnContext,
mode: TurnLoopMode,
snapshot: Option<&CapacitySnapshot>,
tool_registry: Option<&ToolRegistry>,
tool_exec_lock: Arc<RwLock<()>>,
mcp_pool: Option<Arc<AsyncMutex<McpPool>>>,
) -> bool {
if !self.runtime_ext().kernel_machine_mode.uses_v3_turn_loop() {
return self
.apply_verify_with_tool_replay(
turn,
mode,
snapshot,
tool_registry,
tool_exec_lock,
mcp_pool,
)
.await;
}
tracing::info!(
target: "kernel_v3",
turn_id = %turn.id,
step = turn.step,
"v3 capacity: tool replay (effect plan)"
);
if self.effect_replay_anchor_only() {
tracing::info!(
target: "kernel_v3",
turn_id = %turn.id,
"replay anchor-only: skipping capacity tool replay IO"
);
return true;
}
self.apply_verify_with_tool_replay(
turn,
mode,
snapshot,
tool_registry,
tool_exec_lock,
mcp_pool,
)
.await
}
pub(in crate::core::engine) async fn route_capacity_cooldown_sleep(&mut self) {
if !self.runtime_ext().kernel_machine_mode.uses_v3_turn_loop() {
return;
}
let mut interpreter = EffectInterpreter::new(self);
let _ = interpreter
.interpret(Effect::Sleep {
millis: capacity_cooldown_backoff_millis(),
})
.await;
}
}