deepstrike_core/scheduler/state_machine/mod.rs
1use std::collections::HashMap;
2
3use super::milestone::MilestoneTracker;
4use super::policy::LoopPolicy;
5use super::tcb::{ScheduleDecision, TaskState, TaskTable, Tcb, WaitReason};
6use crate::AgentRunSpec;
7use crate::context::manager::ContextManager;
8use crate::governance::pipeline::GovernancePipeline;
9use crate::signals::router::SignalRouter;
10use crate::types::result::SubAgentResult;
11use crate::context::renderer::RenderedContext;
12// `pub use` so external integration tests that glob `state_machine::*` resolve the observation
13// type here — exactly as they did for the former `pub enum LoopObservation` this replaced.
14pub use crate::runtime::kernel::KernelObservation;
15use crate::runtime::session::RollbackReason;
16use crate::types::message::{
17 Content, ContentPart, Message, ToolCall, ToolErrorKind, ToolResult, ToolSchema,
18};
19use crate::types::milestone::MilestoneCheckResult;
20use crate::types::result::{LoopResult, TerminationReason};
21use crate::types::signal::RuntimeSignal;
22use crate::types::task::RuntimeTask;
23
24/// Compact digest of a tool call's arguments for the recency log (2b). Kept short and CJK-safe — it
25/// only needs to make `same-tool / different-args` calls distinguishable (so a legit loop isn't
26/// flagged as a no-progress repeat) and to read sensibly in the "just did: …" footer. Empty for
27/// no-arg / `{}` calls. Lives in the volatile State turn, so length here never churns the cache.
28fn compact_tool_args(args: &serde_json::Value) -> String {
29 if args.is_null() {
30 return String::new();
31 }
32 let s = args.to_string();
33 if s == "{}" {
34 return String::new();
35 }
36 const MAX: usize = 48;
37 if s.chars().count() <= MAX {
38 s
39 } else {
40 format!("{}…", s.chars().take(MAX).collect::<String>())
41 }
42}
43
44/// The *turn step* of the L* execution loop (M1d).
45///
46/// Schedulability (`Ready/Running/Blocked/Suspended/Done`) is no longer carried here — it lives
47/// on the root task's [`TaskState`] in the kernel's `TaskTable`, queried via
48/// [`LoopStateMachine::lifecycle`]. `LoopPhase` is now orthogonal: it only records *which step of a
49/// running turn* the loop is in. When the task is `Ready/Suspended/Done`, the phase value is
50/// inert (left at its last step) and ignored.
51#[derive(Debug, Clone)]
52pub enum LoopPhase {
53 Reason,
54 Act { tool_calls: Vec<ToolCall> },
55 Observe { results: Vec<ToolResult> },
56 Delta { pressure: f64 },
57}
58
59/// Why the loop entered `Suspended` state.
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum SuspendReason {
62 /// Governance `AskUser` — waiting for SDK to resolve human approval.
63 AskUser,
64 /// Sub-agent spawned — waiting for sub-agent to complete.
65 SubAgentAwait,
66 /// Externally requested suspension.
67 External,
68}
69
70/// What the loop is blocked waiting for.
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum BlockReason {
73 /// Awaiting a tool's continuation (tool suspend pattern).
74 ToolSuspend,
75 /// Awaiting milestone evaluation result.
76 MilestoneAwait,
77}
78
79/// Events fed into the state machine from the SDK layer.
80#[derive(Debug)]
81pub enum LoopEvent {
82 Start {
83 task: RuntimeTask,
84 },
85 LLMResponse {
86 message: Message,
87 },
88 ToolResults {
89 results: Vec<ToolResult>,
90 },
91 /// Inbound signal from SignalRouter — Critical/High urgency may interrupt.
92 Signal {
93 signal: RuntimeSignal,
94 },
95 /// Result of evaluating the current milestone phase's criteria.
96 /// Feed this back after handling `LoopAction::EvaluateMilestone`.
97 MilestoneResult {
98 result: MilestoneCheckResult,
99 },
100 /// Sub-agent run completed — result is injected into the loop as context.
101 SubAgentCompleted {
102 result: SubAgentResult,
103 },
104 Timeout,
105}
106
107/// Actions the state machine outputs — SDK layer executes the I/O.
108#[derive(Debug)]
109pub enum LoopAction {
110 /// Structured context ready for a provider call.
111 /// `context.system_text` → provider system param.
112 /// `context.turns` → provider messages array (strictly alternating).
113 /// `tools` → tool schemas (skill / memory / knowledge / user tools).
114 CallLLM {
115 context: RenderedContext,
116 tools: Vec<ToolSchema>,
117 },
118 ExecuteTools {
119 calls: Vec<ToolCall>,
120 },
121 Done {
122 result: LoopResult,
123 },
124 /// Kernel requests the SDK to evaluate the current milestone phase.
125 ///
126 /// The SDK should assess `criteria` against the agent's output using the
127 /// specified `verifier`, then feed back `LoopEvent::MilestoneResult { result }`.
128 EvaluateMilestone {
129 phase_id: String,
130 criteria: Vec<String>,
131 verifier: Option<crate::types::milestone::MilestoneVerifier>,
132 required_evidence: Vec<String>,
133 },
134 /// Kernel is suspended — SDK must resolve (e.g. human approval) and feed `Resume`.
135 AwaitingResume,
136}
137
138/// Payload held while the loop is in `Suspended`.
139#[derive(Debug, Clone)]
140pub(super) enum SuspendState {
141 /// Governance AskUser — awaiting `Resume { approved_calls, denied_calls }`.
142 AskUser {
143 calls: Vec<ToolCall>,
144 gated_reasons: HashMap<String, String>,
145 },
146 /// Sub-agent spawn — awaiting `SubAgentCompleted` for each listed agent id.
147 SubAgentAwait {
148 agent_ids: Vec<String>,
149 },
150}
151
152pub(super) enum GateToolOutcome {
153 Proceed,
154 Blocked(LoopAction),
155 Suspended,
156}
157
158/// Snapshot of context lengths captured just before each LLM call.
159/// Used internally to restore state on rollback.
160#[derive(Debug, Clone, Default)]
161pub struct TurnCheckpoint {
162 pub history_len: usize,
163 pub signals_len: usize,
164 pub task_state: Option<crate::context::task_state::TaskState>,
165}
166
167/// Pure state machine for the L* execution loop. No I/O — only state transitions.
168///
169/// Internal engine backing [`crate::runtime::KernelRuntime`]. Exposed for in-crate
170/// use and tests; external callers should drive the kernel through `KernelRuntime`.
171#[doc(hidden)]
172pub struct LoopStateMachine {
173 pub phase: LoopPhase,
174 pub turn: u32,
175 pub ctx: ContextManager,
176 pub tools: Vec<ToolSchema>,
177 pub observations: Vec<KernelObservation>,
178 pub(super) policy: LoopPolicy,
179 pub(super) total_tokens: u64,
180 /// L1 (RunGroup): cumulative tokens spent by *other* members of this run's governance domain,
181 /// seeded at boot via `seed_group_budget`. The run-level token cap is enforced against
182 /// `group_tokens_base + total_tokens` so the budget spans the whole group, not one vehicle.
183 /// 0 (default) ⇒ no group (N=1) ⇒ pre-L1 per-kernel behavior (byte-identical).
184 pub(super) group_tokens_base: u64,
185 /// L1 (RunGroup): sub-agents spawned by *other* members of this run's governance domain, seeded
186 /// at boot. `max_total_subagents` is enforced against `group_spawns_base + local spawns`. 0 ⇒ N=1.
187 pub(super) group_spawns_base: u32,
188 /// When set, the next LLM call strips tools to force a text response,
189 /// then terminates with this reason once the response arrives.
190 pub(super) pending_termination: Option<TerminationReason>,
191 /// Number of history messages present at session start (after preload_history).
192 /// drain_new_messages() returns the slice from this offset onward.
193 pub(super) session_history_baseline: usize,
194 pub(super) checkpoint: TurnCheckpoint,
195 /// Milestone contract tracker (extracted to reduce state machine bloat).
196 pub(super) milestone: MilestoneTracker,
197 pub run_spec: Option<AgentRunSpec>,
198 /// M1 収口: the single source of truth for schedulability *and* sub-agent lineage. Root is
199 /// task `"root"`; each sub-agent is a child task carrying its `ProcInfo`. The former
200 /// `ProcessTable` is now a derived view over this (`agent_process(es)` rebuild `AgentProcess`
201 /// rows on demand via `AgentProcess::from_tcb`).
202 pub(super) tasks: TaskTable,
203 /// Optional governance pipeline. When set, every tool call proposed by the
204 /// model is evaluated before `ExecuteTools` is emitted. `None` (default)
205 /// skips the gate entirely, preserving the pre-governance behavior.
206 pub(super) governance: Option<GovernancePipeline>,
207 /// Optional resource quota evaluated at the syscall trap (M2). `None` (default) leaves spawn /
208 /// memory syscalls unconditionally allowed, preserving pre-M2 behavior.
209 pub(super) resource_quota: Option<crate::governance::quota::ResourceQuota>,
210 /// Timestamps of recent allowed `WriteMemory` syscalls, for the rolling-window rate limit.
211 /// Only populated when `resource_quota.memory_writes_per_window` is set.
212 pub(super) memory_write_times: Vec<u64>,
213 /// Optional long-term memory policy (`set_memory_policy`). `None` (default) preserves
214 /// pre-policy behavior: default-rule validation + verbatim retrieval `top_k`.
215 pub(super) memory_policy: Option<crate::mm::memory::MemoryPolicy>,
216 /// Optional in-kernel signal router. When set, inbound signals are routed
217 /// through dedup + attention policy + queue here (the kernel owns disposition).
218 /// `None` (default) keeps the legacy hardcoded urgency handling in `feed`.
219 pub(super) signal_router: Option<SignalRouter>,
220 /// Wall-clock timestamp of the first `ProviderResult.now_ms` received.
221 /// Used by the wall-time budget axis in `SchedulerBudget::should_terminate`.
222 pub(super) started_at_ms: Option<u64>,
223 /// Most-recent `now_ms` value from `ProviderResult`, forwarded to the budget check.
224 pub(super) last_now_ms: Option<u64>,
225 /// Tool batch awaiting `Resume` after an AskUser suspend.
226 pub(super) suspend_state: Option<SuspendState>,
227 /// Denied tool results to merge into the next `ToolResults` feed after resume.
228 pub(super) pending_denied_results: Vec<ToolResult>,
229 /// W0: an in-flight workflow DAG, when one is loaded. The kernel spawns its ready nodes as
230 /// gated batches (each through `evaluate_syscall(Syscall::Spawn)`) and advances on
231 /// completions. `None` (default) preserves the single-spawn `spawn_sub_agent` behavior.
232 pub(super) workflow: Option<crate::orchestration::workflow::WorkflowRun>,
233}
234
235mod signal;
236mod capability;
237mod gate;
238mod eviction;
239mod process;
240mod workflow;
241mod milestone_exec;
242
243impl LoopStateMachine {
244 fn message_tokens(&self, message: &Message) -> u32 {
245 message
246 .token_count
247 .unwrap_or_else(|| self.ctx.engine.count_message(message))
248 }
249
250 pub fn new(policy: LoopPolicy) -> Self {
251 let mut tasks = TaskTable::new();
252 // M1d: the root task carries the authoritative schedulability lifecycle. It starts
253 // `Ready`; `start()`/`resume_*` flip it to `Running`, suspends set `Suspended`, and
254 // `terminate()` sets `Done`. `phase` is now only the intra-turn step.
255 tasks.insert(Tcb::root("root", policy.clone()));
256 Self {
257 // Inert placeholder step; meaningful only while the root task is `Running`.
258 phase: LoopPhase::Reason,
259 turn: 0,
260 ctx: ContextManager::new(policy.max_tokens),
261 tools: Vec::new(),
262 observations: Vec::new(),
263 policy,
264 total_tokens: 0,
265 group_tokens_base: 0,
266 group_spawns_base: 0,
267 pending_termination: None,
268 session_history_baseline: 0,
269 checkpoint: TurnCheckpoint::default(),
270 milestone: MilestoneTracker::new(),
271 run_spec: None,
272 tasks,
273 governance: None,
274 resource_quota: None,
275 memory_write_times: Vec::new(),
276 memory_policy: None,
277 signal_router: Some(SignalRouter::new(64)),
278 started_at_ms: None,
279 last_now_ms: None,
280 suspend_state: None,
281 pending_denied_results: Vec::new(),
282 workflow: None,
283 }
284 }
285
286 /// The authoritative schedulability lifecycle of the loop (root task state). Replaces the
287 /// removed `LoopPhase::{Idle,Suspended,Blocked,Terminal}` reads.
288 pub fn lifecycle(&self) -> TaskState {
289 self.tasks.get("root").map(|t| t.state).unwrap_or(TaskState::Ready)
290 }
291
292 /// The wait reason while suspended/blocked, if any.
293 pub fn wait_reason(&self) -> Option<WaitReason> {
294 self.tasks.get("root").and_then(|t| t.wait.clone())
295 }
296
297 /// Whether the loop has terminated.
298 pub fn is_terminal(&self) -> bool {
299 matches!(self.lifecycle(), TaskState::Done(_))
300 }
301
302 /// Whether the loop is suspended awaiting external resolution.
303 pub fn is_suspended(&self) -> bool {
304 matches!(self.lifecycle(), TaskState::Suspended)
305 }
306
307 /// Set the root task's lifecycle (and wait reason). Single mutation point for schedulability.
308 fn set_lifecycle(&mut self, state: TaskState, wait: Option<WaitReason>) {
309 if let Some(root) = self.tasks.get_mut("root") {
310 root.state = state;
311 root.wait = wait;
312 } else {
313 let mut root = Tcb::root("root", self.policy.clone());
314 root.state = state;
315 root.wait = wait;
316 self.tasks.insert(root);
317 }
318 }
319
320 /// Build a transient root [`Tcb`] mirroring the current scheduling facts (budget counters,
321 /// wall-clock anchors, lifecycle). M1b uses this to run the pure `schedule()` spine in
322 /// parallel with the legacy budget path; later milestones promote it to the live task row.
323 fn root_tcb(&self) -> Tcb {
324 let mut tcb = Tcb::root("root", self.policy.clone());
325 tcb.budget.turns = self.turn;
326 // L1: the token-budget axis is evaluated against the whole governance domain's cumulative
327 // spend (this vehicle's `total_tokens` plus other members' `group_tokens_base`).
328 tcb.budget.total_tokens = self.total_tokens.saturating_add(self.group_tokens_base);
329 tcb.budget.started_at_ms = self.started_at_ms;
330 tcb.state = self.lifecycle();
331 tcb
332 }
333
334 /// Adjust the wall-clock budget axis at runtime.
335 pub fn set_wall_budget(&mut self, max_wall_ms: Option<u64>) {
336 self.policy.max_wall_ms = max_wall_ms;
337 }
338
339 /// Install a governance pipeline. Once set, all model-proposed tool calls
340 /// are evaluated before execution. Denied/rate-limited calls roll the turn
341 /// back (reusing the `GovernanceDenied` path); `AskUser` calls surface a
342 /// `ToolGated` observation for the SDK to enforce.
343 pub fn set_governance(&mut self, pipeline: GovernancePipeline) {
344 self.governance = Some(pipeline);
345 }
346
347 /// Install resource quotas (M2). Once set, `Spawn` and `WriteMemory` syscalls are bounded by
348 /// the quota at the trap. Not setting it (the default) leaves them unconditionally allowed.
349 pub fn set_resource_quota(&mut self, quota: crate::governance::quota::ResourceQuota) {
350 self.resource_quota = Some(quota);
351 }
352
353 /// L1 (RunGroup): seed the cumulative tokens already spent by other members of this run's
354 /// governance domain. The run-level token cap is then enforced against the group total. Seeding
355 /// 0 (the default) preserves pre-L1 per-vehicle behavior.
356 pub fn seed_group_budget(&mut self, tokens_spent: u64) {
357 self.group_tokens_base = tokens_spent;
358 }
359
360 /// L1 (RunGroup): seed the sub-agents already spawned by other members of this run's governance
361 /// domain. `max_total_subagents` is then enforced against the group total. 0 ⇒ pre-L1 behavior.
362 pub fn seed_group_spawns(&mut self, subagents_spawned: u32) {
363 self.group_spawns_base = subagents_spawned;
364 }
365
366 /// L1: this vehicle's cumulative sub-agent spawns this run — every child task ever registered in
367 /// the `TaskTable` (running + completed), distinct from the *instantaneous* running count. Used
368 /// for the cumulative spawn quota and read back by the SDK to charge the group ledger at run end.
369 pub fn local_subagents_spawned(&self) -> u32 {
370 self.tasks.all().iter().filter(|t| t.proc.is_some()).count() as u32
371 }
372
373 /// Install the long-term memory policy (`set_memory_policy`). Once set it gates `write_memory`
374 /// validation and bounds `query_memory` retrieval breadth. Not setting it (the default)
375 /// preserves pre-policy behavior.
376 pub fn set_memory_policy(&mut self, policy: crate::mm::memory::MemoryPolicy) {
377 self.memory_policy = Some(policy);
378 }
379
380 /// The installed memory policy, if any. `None` means default-rule validation + verbatim top_k.
381 pub fn memory_policy(&self) -> Option<&crate::mm::memory::MemoryPolicy> {
382 self.memory_policy.as_ref()
383 }
384
385 /// Feed the current wall-clock time (ms) to scheduler/governance budget axes.
386 pub fn set_observed_time(&mut self, now_ms: u64) {
387 if self.started_at_ms.is_none() {
388 self.started_at_ms = Some(now_ms);
389 }
390 self.last_now_ms = Some(now_ms);
391 if let Some(pipeline) = self.governance.as_mut() {
392 pipeline.set_time(now_ms);
393 }
394 }
395
396 /// Pre-populate the history partition with messages from a prior session.
397 ///
398 /// Call **before** `start()` when resuming a conversation. Sets the baseline
399 /// so `drain_new_messages()` returns only the messages from the current run.
400 pub fn preload_history(&mut self, messages: Vec<Message>) {
401 for msg in messages {
402 let tokens = self.message_tokens(&msg);
403 self.ctx.push_history(msg, tokens);
404 }
405 self.session_history_baseline = self.ctx.partitions.history.messages.len();
406 }
407
408 /// Continue from preloaded history without appending a new user turn.
409 /// Use after `preload_history` when recovering a session that ended mid-run.
410 ///
411 /// If the last assistant turn has tool calls without matching tool results,
412 /// resumes with `ExecuteTools` instead of calling the LLM again.
413 pub fn resume_after_preload(&mut self) -> LoopAction {
414 self.observations.clear();
415 let calls = crate::runtime::repair::pending_tool_calls_from_messages(
416 &self.ctx.partitions.history.messages,
417 );
418 if !calls.is_empty() {
419 self.emit_page_in_requested(&calls);
420 self.phase = LoopPhase::Act {
421 tool_calls: calls.clone(),
422 };
423 self.set_lifecycle(TaskState::Running, None);
424 return LoopAction::ExecuteTools { calls };
425 }
426 self.phase = LoopPhase::Reason;
427 self.emit_call_llm()
428 }
429
430 /// Return all messages added to history during the current run
431 /// (since the last `preload_history` call or since construction).
432 ///
433 /// Call after `LoopAction::Done` to get the complete turn transcript
434 /// for persistence to a SessionStore.
435 pub fn drain_new_messages(&self) -> Vec<Message> {
436 let history = &self.ctx.partitions.history.messages;
437 let start = self.session_history_baseline.min(history.len());
438 history[start..].to_vec()
439 }
440
441 pub fn start(&mut self, task: RuntimeTask) -> LoopAction {
442 self.observations.clear();
443 self.ctx.init_task(task.goal.clone(), task.criteria.clone());
444
445 let user_msg = "Proceed with the task described in [TASK STATE].".to_string();
446
447 // User message goes into history so it appears at the correct chronological
448 // position: [prior turns...] → [current user message] — LLM reads left-to-right
449 // and responds to the last message. working is reserved for runtime signals only.
450 // Estimate tokens (1 token ≈ 4 chars) with a minimum of 1 so the renderer
451 // does not skip this message (it skips zero-token entries).
452 let user_tokens = self.ctx.engine.count(&user_msg).max(1);
453 self.ctx.push_history(Message::user(user_msg), user_tokens);
454 self.phase = LoopPhase::Reason;
455 // Root task (seeded `Ready` in `new()`) becomes `Running`; `emit_call_llm` sets it.
456 self.emit_call_llm()
457 }
458
459 pub fn feed(&mut self, event: LoopEvent) -> LoopAction {
460 self.observations.clear();
461 self.sweep_expired_leases();
462
463 match event {
464 LoopEvent::Start { task } => self.start(task),
465
466 LoopEvent::LLMResponse { message } => {
467 let tokens = self.message_tokens(&message);
468 self.total_tokens += tokens as u64;
469
470 if let Some(reason) = self.pending_termination.take() {
471 return self.terminate(reason, Some(message));
472 }
473
474 if message.tool_calls.is_empty() {
475 // When a milestone contract is active and not yet complete,
476 // request evaluation instead of terminating.
477 if !self.milestone.is_complete() {
478 let phase_id = self.milestone.current_phase_id().unwrap_or("").to_string();
479 let criteria = self.milestone.current_criteria().to_vec();
480 let (verifier, required_evidence) = self
481 .milestone
482 .contract
483 .as_ref()
484 .and_then(|c| c.phases.get(self.milestone.current_phase))
485 .map(|p| (p.verifier.clone(), p.required_evidence.clone()))
486 .unwrap_or_default();
487 // `tokens` was already computed for this message above.
488 self.ctx.push_history(message, tokens);
489 return LoopAction::EvaluateMilestone {
490 phase_id,
491 criteria,
492 verifier,
493 required_evidence,
494 };
495 }
496 return self.terminate(TerminationReason::Completed, Some(message));
497 }
498
499 let calls = message.tool_calls.clone();
500 self.ctx.push_history(message, tokens);
501
502 // ━━ 记录活动时间(Layer 3时间衰减使用)
503 if let Some(now_ms) = self.last_now_ms {
504 self.ctx.record_activity(now_ms);
505 }
506
507 // 2b: record this turn's tool activity into the task-state recency log (meta-tools
508 // filtered inside). The State-turn footer renders it as "just did: …" + a forward
509 // nudge / STOP, so progress is kernel-derived and never depends on the model
510 // remembering to call `update_plan`. Tool *names* live only on the request (results
511 // carry call_id only), so this is the turn to capture them.
512 //
513 // Capture name AND a compact arg digest: the no-progress STOP keys on whether the
514 // SAME call repeats, and a legit loop (same tool, DIFFERENT args — e.g. processing 20
515 // items) is real progress, not a stall. Keying on the name alone false-positives those
516 // loops; including args distinguishes "step(n=1), step(n=2)…" from a true repeat.
517 let action_sigs: Vec<(String, String)> = calls
518 .iter()
519 .map(|c| (c.name.to_string(), compact_tool_args(&c.arguments)))
520 .collect();
521 self.ctx.note_tool_actions(&action_sigs);
522
523 match self.gate_tool_calls(&calls) {
524 GateToolOutcome::Blocked(action) => return action,
525 GateToolOutcome::Suspended => return LoopAction::AwaitingResume,
526 GateToolOutcome::Proceed => {}
527 }
528 self.emit_page_in_requested(&calls);
529 self.phase = LoopPhase::Act {
530 tool_calls: calls.clone(),
531 };
532 self.set_lifecycle(TaskState::Running, None);
533 LoopAction::ExecuteTools { calls }
534 }
535
536 LoopEvent::ToolResults { mut results } => {
537 if !self.pending_denied_results.is_empty() {
538 results.append(&mut self.pending_denied_results);
539 }
540 if let Some(reason) = results
541 .iter()
542 .find_map(|result| self.rollback_reason_for_tool_result(result))
543 {
544 let note = Message::user(super::rollback::build_rollback_note(
545 &reason,
546 self.ctx.config.verbose_control_notes,
547 ));
548 self.rollback(reason);
549 self.ctx.push_signal(note.content.as_text().unwrap_or_default().to_string());
550 self.phase = LoopPhase::Reason;
551 return self.emit_call_llm();
552 }
553 // Non-fatal errors are committed to history so the LLM can
554 // see them and self-correct without losing turn state.
555
556 for r in &results {
557 self.total_tokens += r.token_count.unwrap_or(0) as u64;
558 // Preserve Content::Parts (structured / multimodal tool output).
559 // Parts are serialised to JSON so the text can be restored faithfully.
560 let raw_output = match &r.output {
561 Content::Text(s) => s.clone(),
562 Content::Parts(parts) => serde_json::to_string(parts).unwrap_or_default(),
563 };
564 // Layer 1 spool: oversized results keep only a preview in context; the kernel
565 // emits `LargeResultSpooled` so the SDK persists the full output it still holds.
566 let (output, spooled) = match crate::mm::plan_spool(
567 &raw_output,
568 self.ctx.config.spool_threshold_bytes,
569 self.ctx.config.spool_preview_bytes,
570 ) {
571 Some(decision) => {
572 self.observations.push(KernelObservation::LargeResultSpooled {
573 turn: self.turn,
574 call_id: r.call_id.to_string(),
575 // ToolResult carries no tool name; the SDK maps call_id -> tool.
576 tool: String::new(),
577 original_size: decision.original_size,
578 preview_size: decision.preview.len() as u32,
579 spool_ref: None,
580 });
581 (decision.preview, true)
582 }
583 None => (raw_output, false),
584 };
585 let parts = vec![ContentPart::ToolResult {
586 call_id: r.call_id.clone(),
587 output,
588 is_error: r.is_error,
589 }];
590 let tool_msg = Message::tool(parts);
591 // When spooled, `r.token_count` reflects the full output — recount the preview.
592 let tokens = if spooled {
593 self.ctx.engine.count_message(&tool_msg)
594 } else {
595 r.token_count
596 .unwrap_or_else(|| self.ctx.engine.count_message(&tool_msg))
597 };
598 self.ctx.push_history(tool_msg, tokens);
599 // Layer 1: a spooled result's handle is marked SpooledOut (its full output now
600 // lives on disk via the SDK); the SDK maps call_id -> the persisted ref.
601 if spooled {
602 self.ctx.mark_spooled(&r.call_id, r.call_id.to_string());
603 }
604 }
605 self.turn += 1;
606
607 // M1 收口: the pure `schedule()` is now the single budget decision point.
608 // It evaluates the same three axes (turn/token/wall) via `BudgetLedger`, which
609 // delegates to `SchedulerBudget::should_terminate` internally — one source of truth.
610 if let ScheduleDecision::Terminate { reason: term, .. } =
611 super::tcb::schedule(&self.root_tcb(), self.last_now_ms)
612 {
613 let budget = match term {
614 TerminationReason::MaxTurns => "max_turns",
615 TerminationReason::Timeout => "wall_time",
616 _ => "token_budget",
617 };
618 self.observations.push(KernelObservation::BudgetExceeded {
619 turn: self.turn,
620 budget: budget.to_string(),
621 });
622 self.pending_termination = Some(term);
623 self.phase = LoopPhase::Reason;
624 return self.emit_call_llm();
625 }
626
627 // ━━ Eviction checkpoint (M3): one decision model (`plan_eviction`), one
628 // execution funnel (`execute_eviction_op`). Layer 3 (idle/time-decay) must run
629 // before the rho recommendation is read, since it mutates token usage — so the
630 // plan is built in that interleaved order and the ops are executed in plan order.
631 let idle_decay = self
632 .last_now_ms
633 .is_some_and(|now_ms| self.ctx.should_time_decay_compact(now_ms));
634 if idle_decay {
635 self.execute_eviction_op(&crate::mm::EvictionOp::TimeDecayMicro);
636 }
637
638 // Layer 4 read-time projection: recompute handle residency on the post-time-decay rho.
639 self.ctx.recompute_handle_residency();
640 self.phase = LoopPhase::Delta {
641 pressure: self.ctx.rho(),
642 };
643
644 // Layers 2/4/5: execute the pressure-driven ops from the plan (skip TimeDecayMicro
645 // if already executed). The plan carries specific ops stamped with real config-derived
646 // params (W1-1 収口 — no magic-number placeholders), not the umbrella `Pressure` wrapper.
647 let (target_tokens, preserve_turns) = self.ctx.plan_compaction_params();
648 let plan =
649 crate::mm::plan_eviction(self.ctx.should_compress(), idle_decay, target_tokens, preserve_turns);
650 // `idle_decay` ⇒ the plan carries a `TimeDecayMicro` (so the skip-on-already-executed
651 // below is meaningful). The converse does NOT hold: a pressure-driven `MicroCompact`
652 // also emits `TimeDecayMicro` independent of `idle_decay` (W1 unified planner), so we
653 // assert the implication, not equality.
654 debug_assert!(!idle_decay || plan.has_time_decay());
655 for op in &plan.ops {
656 // Skip TimeDecayMicro if we already executed it (prevents double-execution).
657 if matches!(op, crate::mm::EvictionOp::TimeDecayMicro) && idle_decay {
658 continue;
659 }
660 self.execute_eviction_op(op);
661 }
662
663 // Renewal: when compression alone cannot recover enough headroom,
664 // start a new sprint — carry forward system + memory + last N history turns.
665 if self.ctx.should_renew() {
666 self.ctx.renew();
667 // A new sprint is a session boundary for signal identity: clear the dedup set so
668 // it cannot grow unbounded across a long run, and so a signal seen in a prior
669 // sprint may legitimately re-fire in the new one.
670 if let Some(router) = self.signal_router.as_mut() {
671 router.clear_dedup();
672 }
673 self.observations.push(KernelObservation::Renewed {
674 sprint: self.ctx.sprint,
675 });
676 }
677
678 // Turn boundary: drain any kernel-queued signals into context so they
679 // are seen on the next reasoning turn (ready queue → running).
680 self.drain_queued_signals();
681
682 self.phase = LoopPhase::Reason;
683 self.emit_call_llm()
684 }
685
686 LoopEvent::Signal { signal } => {
687 // `feed` always returns an action; non-actionable dispositions
688 // (queue/observe/ignore) fall back to a plain provider call here.
689 // The kernel-routed path (`dispatch_signal`) is driven via the ABI.
690 self.dispatch_signal(signal)
691 .unwrap_or_else(|| self.emit_call_llm())
692 }
693
694 LoopEvent::MilestoneResult { result } => self.handle_milestone_result(result),
695
696 LoopEvent::SubAgentCompleted { result } => self.handle_sub_agent_completed(result),
697
698 LoopEvent::Timeout => {
699 let reason = RollbackReason::Timeout;
700 let note = Message::user(super::rollback::build_rollback_note(
701 &reason,
702 self.ctx.config.verbose_control_notes,
703 ));
704 self.rollback(reason);
705 self.ctx.push_signal(note.content.as_text().unwrap_or_default().to_string());
706 self.phase = LoopPhase::Reason;
707 self.emit_call_llm()
708 }
709 }
710 }
711
712
713 /// Drain observations emitted during the last `start`/`feed` call.
714 pub fn take_observations(&mut self) -> Vec<KernelObservation> {
715 std::mem::take(&mut self.observations)
716 }
717
718 /// W2-2: Create a snapshot of the current kernel state for crash recovery or migration.
719 pub fn snapshot(&self) -> crate::runtime::snapshot::KernelSnapshot {
720 use crate::runtime::snapshot::{ContextSnapshot, KernelSnapshot};
721 let context = ContextSnapshot::from_context(&self.ctx);
722 KernelSnapshot::from_state(
723 self.turn,
724 self.total_tokens,
725 &self.tasks,
726 &context,
727 self.run_spec.as_ref(),
728 )
729 }
730
731 /// W2-2: Restore kernel state from a snapshot. Returns a new LoopStateMachine rebuilt from the snapshot.
732 /// Note: This is a foundational restore - some state (governance, milestone, signal router dedup) is
733 /// recreated from policy/config rather than serialized, following the principle that strategy is data.
734 pub fn restore(snap: &crate::runtime::snapshot::KernelSnapshot) -> Self {
735 use crate::signals::router::SignalRouter;
736
737 // Reconstruct policy from the max_tokens in snapshot
738 let policy = crate::scheduler::policy::LoopPolicy {
739 max_tokens: snap.context.max_tokens,
740 ..Default::default()
741 };
742
743 // Rebuild TaskTable from snapshot TCBs
744 let mut tasks = TaskTable::new();
745 for tcb_snap in &snap.tasks {
746 if let Some(tcb) = snap.restore_tcb(tcb_snap) {
747 tasks.insert(tcb);
748 }
749 }
750
751 // Rebuild context partitions from snapshot
752 let mut ctx = ContextManager::new(snap.context.max_tokens);
753 ctx.sprint = snap.context.sprint;
754
755 // Restore messages
756 for msg in &snap.context.system_messages {
757 let tokens = ctx.engine.count_message(msg);
758 ctx.partitions.system.push(msg.clone(), tokens);
759 }
760 for msg in &snap.context.knowledge_messages {
761 let tokens = ctx.engine.count_message(msg);
762 ctx.partitions.knowledge.push(msg.clone(), tokens);
763 }
764 for msg in &snap.context.history_messages {
765 let tokens = ctx.engine.count_message(msg);
766 ctx.partitions.history.push(msg.clone(), tokens);
767 }
768
769 // Restore task state
770 if let Some(goal) = &snap.context.task_goal {
771 ctx.partitions.task_state.goal = goal.clone();
772 }
773 if let Some(plan_json) = &snap.context.task_plan {
774 if let Ok(plan_steps) = serde_json::from_str::<Vec<crate::context::task_state::PlanStep>>(plan_json) {
775 ctx.partitions.task_state.plan = plan_steps;
776 }
777 }
778 if let Some(progress) = &snap.context.task_progress {
779 ctx.partitions.task_state.progress = progress.clone();
780 }
781 ctx.partitions.task_state.directives = snap.context.task_directives.clone();
782
783 // Restore signals
784 ctx.partitions.signals = snap.context.signals.clone();
785
786 Self {
787 phase: LoopPhase::Reason,
788 turn: snap.turn,
789 ctx,
790 tools: Vec::new(), // Tools are rebuilt from capabilities on next LLM call
791 observations: Vec::new(),
792 policy,
793 total_tokens: snap.total_tokens,
794 // Re-seeded from the replayed `ConfigureRun` (strategy is data, not serialized state).
795 group_tokens_base: 0,
796 group_spawns_base: 0,
797 pending_termination: None,
798 session_history_baseline: 0,
799 checkpoint: TurnCheckpoint::default(),
800 milestone: crate::scheduler::milestone::MilestoneTracker::new(),
801 run_spec: snap.run_spec(),
802 tasks,
803 governance: None, // Governance is policy data, recreated from config
804 resource_quota: None,
805 memory_write_times: Vec::new(),
806 memory_policy: None,
807 signal_router: Some(SignalRouter::new(64)), // Dedup cleared on restore
808 started_at_ms: None,
809 last_now_ms: None,
810 suspend_state: None,
811 pending_denied_results: Vec::new(),
812 workflow: None,
813 }
814 }
815
816 fn terminate(
817 &mut self,
818 termination: TerminationReason,
819 final_message: Option<Message>,
820 ) -> LoopAction {
821 // Commit the final response into history so subsequent session restores
822 // include the complete transcript: user → [tool turns] → final assistant.
823 if let Some(ref msg) = final_message {
824 let tokens = self.message_tokens(msg);
825 self.ctx.push_history(msg.clone(), tokens);
826 }
827 let result = LoopResult {
828 termination,
829 final_message,
830 turns_used: self.turn,
831 total_tokens_used: self.total_tokens,
832 loop_continue: None,
833 classify_branch: None,
834 tournament_winner: None,
835 };
836 self.set_lifecycle(TaskState::Done(termination), None);
837 LoopAction::Done { result }
838 }
839
840 /// Build the `CallLLM` action with a structured `RenderedContext`.
841 /// Meta-tools (skill / memory / knowledge) are appended to the tool list
842 /// when configured. When `pending_termination` is set, tools are stripped
843 /// to force a plain-text response before the loop terminates.
844 fn emit_call_llm(&mut self) -> LoopAction {
845 // Calling the provider is definitionally "running" — the single funnel for entering the
846 // Running lifecycle (covers start, resume, signal-driven turns, budget final-call).
847 self.set_lifecycle(TaskState::Running, None);
848 self.checkpoint.history_len = self.ctx.partitions.history.messages.len();
849 self.checkpoint.signals_len = self.ctx.partitions.signals.len();
850 self.checkpoint.task_state = Some(self.ctx.partitions.task_state.clone());
851 self.observations.push(KernelObservation::CheckpointTaken {
852 turn: self.turn,
853 history_len: self.checkpoint.history_len as u32,
854 });
855
856 let context = self.ctx.render();
857 if self.pending_termination.is_some() {
858 return LoopAction::CallLLM {
859 context,
860 tools: Vec::new(),
861 };
862 }
863 let mut tools = self.tools.clone();
864 tools.extend(self.ctx.meta_tool_schemas());
865
866 if let Some(ref spec) = self.run_spec {
867 use crate::types::capability::CapabilityKind;
868 tools.retain(|tool| {
869 let kind = match tool.name.as_str() {
870 "skill" => CapabilityKind::Skill,
871 "memory" => CapabilityKind::Memory,
872 "knowledge" => CapabilityKind::Knowledge,
873 _ => CapabilityKind::Tool,
874 };
875 let desc = crate::types::capability::CapabilityDescriptor::marker(
876 kind,
877 tool.name.clone(),
878 &tool.description,
879 );
880 spec.capability_filter.allows(&desc)
881 });
882 }
883
884 // P1-B epoch skill gating (applied *after* the run-level filter ③, so A is the outer bound
885 // and B narrows within it — D6). When skills are active and declare tools, expose only
886 // `meta-tools ∪ stable-core ∪ ⋃(active skills' allowed_tools)`. `None` ⇒ no active/declared
887 // skill ⇒ no narrowing (D3, errs-open). Meta-tools are always exempt (D5) so the model can
888 // still load more skills. Byte-stable within an epoch: the set only changes on activation.
889 if let Some(allowed) = self.ctx.active_skill_tool_filter() {
890 let stable = &self.ctx.stable_core_tools;
891 tools.retain(|tool| {
892 matches!(tool.name.as_str(), "skill" | "memory" | "knowledge" | "update_plan")
893 || stable.contains(&tool.name)
894 || allowed.contains(&tool.name)
895 });
896 }
897
898 LoopAction::CallLLM { context, tools }
899 }
900
901 pub fn rollback(&mut self, reason: RollbackReason) {
902 self.ctx.partitions.history.messages.truncate(self.checkpoint.history_len);
903 self.ctx.partitions.signals.truncate(self.checkpoint.signals_len);
904 if let Some(ref state) = self.checkpoint.task_state {
905 self.ctx.partitions.task_state = state.clone();
906 }
907 self.observations.push(KernelObservation::Rollbacked {
908 turn: self.turn,
909 checkpoint_history_len: self.checkpoint.history_len as u32,
910 reason: Some(reason),
911 });
912 }
913
914 fn rollback_reason_for_tool_result(&self, result: &ToolResult) -> Option<RollbackReason> {
915 let tool_name = self.tool_name_for_call(&result.call_id);
916 let output = super::rollback::tool_result_output_text(result);
917
918 if result.is_fatal {
919 return Some(RollbackReason::FatalToolError {
920 tool_name,
921 error: output,
922 });
923 }
924
925 match result.error_kind {
926 Some(ToolErrorKind::Fatal) => Some(RollbackReason::FatalToolError {
927 tool_name,
928 error: output,
929 }),
930 Some(ToolErrorKind::GovernanceDenied) => Some(RollbackReason::GovernanceDenied {
931 tool_name,
932 reason: output,
933 }),
934 Some(ToolErrorKind::ProviderFailure) => {
935 Some(RollbackReason::ProviderFailure { error: output })
936 }
937 Some(ToolErrorKind::Timeout) => Some(RollbackReason::Timeout),
938 Some(ToolErrorKind::UserInterrupt) => Some(RollbackReason::UserInterrupt),
939 Some(ToolErrorKind::Recoverable) | None => None,
940 }
941 }
942
943 fn tool_name_for_call(&self, call_id: &compact_str::CompactString) -> String {
944 match &self.phase {
945 LoopPhase::Act { tool_calls } => tool_calls
946 .iter()
947 .find(|call| call.id == *call_id)
948 .map(|call| call.name.to_string())
949 .unwrap_or_else(|| call_id.to_string()),
950 _ => call_id.to_string(),
951 }
952 }
953}
954
955#[cfg(test)]
956#[path = "tests.rs"]
957mod tests;