use std::collections::HashSet;
use crate::chat::{LlmClient, Tool};
use crate::engine::context::{TURN_MAX_OUTPUT_TOKENS, context_input_budget};
use crate::engine::loop_guard::LoopGuard;
use crate::engine::streaming::ToolUseState;
use crate::turn::{TurnContext, TurnLoopMode};
use super::control::{TurnLoopStreamingPhaseOutcome, TurnLoopToolPhaseOutcome};
use super::host::V3TurnHost;
use super::inner_step_host::InnerStepHost;
use super::live_turn_machine::LiveTurnMachine;
use super::turn_loop_outer_host::TurnLoopOuterHost;
use super::{streaming_phase, tool_phase, v3_driver};
use crate::engine::kernel_turn_host::KernelTurnHost;
use crate::engine::turn_machine::{TurnKernelProjection, events_for_step};
#[derive(Debug)]
pub struct V3StepOutcome {
pub stream: TurnLoopStreamingPhaseOutcome,
pub tools: TurnLoopToolPhaseOutcome,
}
#[must_use]
pub fn execute_batch_call_ids(tool_uses: &[ToolUseState]) -> Vec<String> {
tool_uses.iter().map(|t| t.id.clone()).collect()
}
#[allow(clippy::too_many_arguments)]
pub async fn run_v3_step<H: InnerStepHost + TurnLoopOuterHost>(
host: &mut H,
turn: &mut TurnContext,
client: &dyn LlmClient,
mode: TurnLoopMode,
tool_catalog: &mut [Tool],
active_tool_names: &mut HashSet<String>,
force_update_plan_first: bool,
stream_retry_attempts: &mut u32,
context_recovery_attempts: &mut u8,
length_continuations: &mut u32,
turn_error: &mut Option<String>,
loop_guard: &mut LoopGuard,
consecutive_tool_error_steps: u32,
tool_registry: Option<&H::ToolRegistry>,
) -> V3StepOutcome {
let model = host.session_mut().model.clone();
let token_budget = context_input_budget(&model, TURN_MAX_OUTPUT_TOKENS)
.map(|b| b.min(u32::MAX as usize) as u32)
.unwrap_or(TURN_MAX_OUTPUT_TOKENS);
let machine = LiveTurnMachine::default();
let projection = TurnKernelProjection::from_events(&host.kernel_turn_events());
let live_plan = machine.inner_step_live_plan(&projection, token_budget, None);
v3_driver::log_inner_step_effect_plan(&turn.id, turn.step, &live_plan.baseline);
for effect in &live_plan.baseline.pre_call_model {
tracing::debug!(
target: "kernel_v3",
turn_id = %turn.id,
step = turn.step,
?effect,
"v3 step: QueryMemory (core fallback — no runtime interpreter IO)"
);
}
tracing::info!(
target: "kernel_v3",
turn_id = %turn.id,
step = turn.step,
token_budget,
"v3 step: CallModel"
);
let mut stream = streaming_phase::run_streaming_phase(
host,
turn,
client,
mode,
tool_catalog,
active_tool_names,
force_update_plan_first,
stream_retry_attempts,
context_recovery_attempts,
length_continuations,
turn_error,
)
.await;
let tools = if stream.tool_uses.is_empty() || !live_plan.baseline.execute_batch_per_call {
TurnLoopToolPhaseOutcome::default()
} else {
let call_ids = execute_batch_call_ids(&stream.tool_uses);
tracing::info!(
target: "kernel_v3",
turn_id = %turn.id,
step = turn.step,
call_count = call_ids.len(),
"v3 step: ExecuteBatch"
);
tool_phase::run_tool_execution_phase(
host,
turn,
mode,
&mut stream.tool_uses,
tool_catalog,
active_tool_names,
loop_guard,
consecutive_tool_error_steps,
tool_registry,
)
.await
};
if live_plan.baseline.notify_lsp_tail {
let step_events = events_for_step(&host.kernel_turn_events(), turn.step);
let notify_tail = machine.inner_step_notify_lsp_effects(&step_events);
if !notify_tail.is_empty() {
tracing::info!(
target: "kernel_v3",
turn_id = %turn.id,
step = turn.step,
notify_count = notify_tail.len(),
"v3 step: NotifyLsp tail (core fallback)"
);
for effect in notify_tail {
let _ = effect;
host.flush_pending_lsp_diagnostics().await;
}
}
}
V3StepOutcome { stream, tools }
}
#[allow(clippy::too_many_arguments)]
pub async fn run_v3_turn_step_unified<H: V3TurnHost>(
host: &mut H,
turn: &mut TurnContext,
client: &dyn LlmClient,
mode: TurnLoopMode,
tool_catalog: &mut [Tool],
active_tool_names: &mut HashSet<String>,
force_update_plan_first: bool,
stream_retry_attempts: &mut u32,
context_recovery_attempts: &mut u8,
length_continuations: &mut u32,
turn_error: &mut Option<String>,
loop_guard: &mut LoopGuard,
consecutive_tool_error_steps: u32,
tool_registry: Option<&H::ToolRegistry>,
) -> V3StepOutcome {
if let Some(out) = KernelTurnHost::try_run_v3_turn_step(
host,
turn,
client,
mode,
tool_catalog,
active_tool_names,
force_update_plan_first,
stream_retry_attempts,
context_recovery_attempts,
length_continuations,
turn_error,
loop_guard,
consecutive_tool_error_steps,
tool_registry,
)
.await
{
return out;
}
run_v3_step(
host,
turn,
client,
mode,
tool_catalog,
active_tool_names,
force_update_plan_first,
stream_retry_attempts,
context_recovery_attempts,
length_continuations,
turn_error,
loop_guard,
consecutive_tool_error_steps,
tool_registry,
)
.await
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn execute_batch_call_ids_preserves_order() {
let uses = vec![
ToolUseState {
id: "a".into(),
name: "read_file".into(),
input: serde_json::json!({}),
caller: None,
input_buffer: String::new(),
},
ToolUseState {
id: "b".into(),
name: "list_dir".into(),
input: serde_json::json!({}),
caller: None,
input_buffer: String::new(),
},
];
assert_eq!(
execute_batch_call_ids(&uses),
vec!["a".to_string(), "b".to_string()]
);
}
}