deepstrike_core/scheduler/state_machine/mod.rs
1use std::collections::HashMap;
2
3use super::milestone::MilestoneTracker;
4use super::policy::LoopPolicy;
5use super::tcb::{ScheduleDecision, TaskState, TaskTable, Tcb, WaitReason};
6use crate::AgentRunSpec;
7use crate::context::manager::ContextManager;
8use crate::governance::pipeline::GovernancePipeline;
9use crate::signals::router::SignalRouter;
10use crate::types::result::SubAgentResult;
11use crate::context::renderer::RenderedContext;
12// `pub use` so external integration tests that glob `state_machine::*` resolve the observation
13// type here — exactly as they did for the former `pub enum LoopObservation` this replaced.
14pub use crate::runtime::kernel::KernelObservation;
15use crate::runtime::session::RollbackReason;
16use crate::types::message::{
17 Content, ContentPart, Message, ToolCall, ToolErrorKind, ToolResult, ToolSchema,
18};
19use crate::types::milestone::MilestoneCheckResult;
20use crate::types::result::{LoopResult, TerminationReason};
21use crate::types::signal::RuntimeSignal;
22use crate::types::task::RuntimeTask;
23
24/// The *turn step* of the L* execution loop (M1d).
25///
26/// Schedulability (`Ready/Running/Blocked/Suspended/Done`) is no longer carried here — it lives
27/// on the root task's [`TaskState`] in the kernel's `TaskTable`, queried via
28/// [`LoopStateMachine::lifecycle`]. `LoopPhase` is now orthogonal: it only records *which step of a
29/// running turn* the loop is in. When the task is `Ready/Suspended/Done`, the phase value is
30/// inert (left at its last step) and ignored.
31#[derive(Debug, Clone)]
32pub enum LoopPhase {
33 Reason,
34 Act { tool_calls: Vec<ToolCall> },
35 Observe { results: Vec<ToolResult> },
36 Delta { pressure: f64 },
37}
38
39/// Why the loop entered `Suspended` state.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum SuspendReason {
42 /// Governance `AskUser` — waiting for SDK to resolve human approval.
43 AskUser,
44 /// Sub-agent spawned — waiting for sub-agent to complete.
45 SubAgentAwait,
46 /// Externally requested suspension.
47 External,
48}
49
50/// What the loop is blocked waiting for.
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum BlockReason {
53 /// Awaiting a tool's continuation (tool suspend pattern).
54 ToolSuspend,
55 /// Awaiting milestone evaluation result.
56 MilestoneAwait,
57}
58
59/// Events fed into the state machine from the SDK layer.
60#[derive(Debug)]
61pub enum LoopEvent {
62 Start {
63 task: RuntimeTask,
64 },
65 LLMResponse {
66 message: Message,
67 },
68 ToolResults {
69 results: Vec<ToolResult>,
70 },
71 /// Inbound signal from SignalRouter — Critical/High urgency may interrupt.
72 Signal {
73 signal: RuntimeSignal,
74 },
75 /// Result of evaluating the current milestone phase's criteria.
76 /// Feed this back after handling `LoopAction::EvaluateMilestone`.
77 MilestoneResult {
78 result: MilestoneCheckResult,
79 },
80 /// Sub-agent run completed — result is injected into the loop as context.
81 SubAgentCompleted {
82 result: SubAgentResult,
83 },
84 Timeout,
85}
86
87/// Actions the state machine outputs — SDK layer executes the I/O.
88#[derive(Debug)]
89pub enum LoopAction {
90 /// Structured context ready for a provider call.
91 /// `context.system_text` → provider system param.
92 /// `context.turns` → provider messages array (strictly alternating).
93 /// `tools` → tool schemas (skill / memory / knowledge / user tools).
94 CallLLM {
95 context: RenderedContext,
96 tools: Vec<ToolSchema>,
97 },
98 ExecuteTools {
99 calls: Vec<ToolCall>,
100 },
101 Done {
102 result: LoopResult,
103 },
104 /// Kernel requests the SDK to evaluate the current milestone phase.
105 ///
106 /// The SDK should assess `criteria` against the agent's output using the
107 /// specified `verifier`, then feed back `LoopEvent::MilestoneResult { result }`.
108 EvaluateMilestone {
109 phase_id: String,
110 criteria: Vec<String>,
111 verifier: Option<crate::types::milestone::MilestoneVerifier>,
112 required_evidence: Vec<String>,
113 },
114 /// Kernel is suspended — SDK must resolve (e.g. human approval) and feed `Resume`.
115 AwaitingResume,
116}
117
118/// Payload held while the loop is in `Suspended`.
119#[derive(Debug, Clone)]
120pub(super) enum SuspendState {
121 /// Governance AskUser — awaiting `Resume { approved_calls, denied_calls }`.
122 AskUser {
123 calls: Vec<ToolCall>,
124 gated_reasons: HashMap<String, String>,
125 },
126 /// Sub-agent spawn — awaiting `SubAgentCompleted` for each listed agent id.
127 SubAgentAwait {
128 agent_ids: Vec<String>,
129 },
130}
131
132pub(super) enum GateToolOutcome {
133 Proceed,
134 Blocked(LoopAction),
135 Suspended,
136}
137
138/// Snapshot of context lengths captured just before each LLM call.
139/// Used internally to restore state on rollback.
140#[derive(Debug, Clone, Default)]
141pub struct TurnCheckpoint {
142 pub history_len: usize,
143 pub signals_len: usize,
144 pub task_state: Option<crate::context::task_state::TaskState>,
145}
146
147/// Pure state machine for the L* execution loop. No I/O — only state transitions.
148///
149/// Internal engine backing [`crate::runtime::KernelRuntime`]. Exposed for in-crate
150/// use and tests; external callers should drive the kernel through `KernelRuntime`.
151#[doc(hidden)]
152pub struct LoopStateMachine {
153 pub phase: LoopPhase,
154 pub turn: u32,
155 pub ctx: ContextManager,
156 pub tools: Vec<ToolSchema>,
157 pub observations: Vec<KernelObservation>,
158 pub(super) policy: LoopPolicy,
159 pub(super) total_tokens: u64,
160 /// L1 (RunGroup): cumulative tokens spent by *other* members of this run's governance domain,
161 /// seeded at boot via `seed_group_budget`. The run-level token cap is enforced against
162 /// `group_tokens_base + total_tokens` so the budget spans the whole group, not one vehicle.
163 /// 0 (default) ⇒ no group (N=1) ⇒ pre-L1 per-kernel behavior (byte-identical).
164 pub(super) group_tokens_base: u64,
165 /// L1 (RunGroup): sub-agents spawned by *other* members of this run's governance domain, seeded
166 /// at boot. `max_total_subagents` is enforced against `group_spawns_base + local spawns`. 0 ⇒ N=1.
167 pub(super) group_spawns_base: u32,
168 /// When set, the next LLM call strips tools to force a text response,
169 /// then terminates with this reason once the response arrives.
170 pub(super) pending_termination: Option<TerminationReason>,
171 /// Number of history messages present at session start (after preload_history).
172 /// drain_new_messages() returns the slice from this offset onward.
173 pub(super) session_history_baseline: usize,
174 pub(super) checkpoint: TurnCheckpoint,
175 /// Milestone contract tracker (extracted to reduce state machine bloat).
176 pub(super) milestone: MilestoneTracker,
177 pub run_spec: Option<AgentRunSpec>,
178 /// M1 収口: the single source of truth for schedulability *and* sub-agent lineage. Root is
179 /// task `"root"`; each sub-agent is a child task carrying its `ProcInfo`. The former
180 /// `ProcessTable` is now a derived view over this (`agent_process(es)` rebuild `AgentProcess`
181 /// rows on demand via `AgentProcess::from_tcb`).
182 pub(super) tasks: TaskTable,
183 /// Optional governance pipeline. When set, every tool call proposed by the
184 /// model is evaluated before `ExecuteTools` is emitted. `None` (default)
185 /// skips the gate entirely, preserving the pre-governance behavior.
186 pub(super) governance: Option<GovernancePipeline>,
187 /// Optional resource quota evaluated at the syscall trap (M2). `None` (default) leaves spawn /
188 /// memory syscalls unconditionally allowed, preserving pre-M2 behavior.
189 pub(super) resource_quota: Option<crate::governance::quota::ResourceQuota>,
190 /// Timestamps of recent allowed `WriteMemory` syscalls, for the rolling-window rate limit.
191 /// Only populated when `resource_quota.memory_writes_per_window` is set.
192 pub(super) memory_write_times: Vec<u64>,
193 /// Optional long-term memory policy (`set_memory_policy`). `None` (default) preserves
194 /// pre-policy behavior: default-rule validation + verbatim retrieval `top_k`.
195 pub(super) memory_policy: Option<crate::mm::memory::MemoryPolicy>,
196 /// Optional in-kernel signal router. When set, inbound signals are routed
197 /// through dedup + attention policy + queue here (the kernel owns disposition).
198 /// `None` (default) keeps the legacy hardcoded urgency handling in `feed`.
199 pub(super) signal_router: Option<SignalRouter>,
200 /// Wall-clock timestamp of the first `ProviderResult.now_ms` received.
201 /// Used by the wall-time budget axis in `SchedulerBudget::should_terminate`.
202 pub(super) started_at_ms: Option<u64>,
203 /// Most-recent `now_ms` value from `ProviderResult`, forwarded to the budget check.
204 pub(super) last_now_ms: Option<u64>,
205 /// Tool batch awaiting `Resume` after an AskUser suspend.
206 pub(super) suspend_state: Option<SuspendState>,
207 /// Denied tool results to merge into the next `ToolResults` feed after resume.
208 pub(super) pending_denied_results: Vec<ToolResult>,
209 /// W0: an in-flight workflow DAG, when one is loaded. The kernel spawns its ready nodes as
210 /// gated batches (each through `evaluate_syscall(Syscall::Spawn)`) and advances on
211 /// completions. `None` (default) preserves the single-spawn `spawn_sub_agent` behavior.
212 pub(super) workflow: Option<crate::orchestration::workflow::WorkflowRun>,
213}
214
215mod signal;
216mod capability;
217mod gate;
218mod eviction;
219mod process;
220mod workflow;
221mod milestone_exec;
222
223impl LoopStateMachine {
224 fn message_tokens(&self, message: &Message) -> u32 {
225 message
226 .token_count
227 .unwrap_or_else(|| self.ctx.engine.count_message(message))
228 }
229
230 pub fn new(policy: LoopPolicy) -> Self {
231 let mut tasks = TaskTable::new();
232 // M1d: the root task carries the authoritative schedulability lifecycle. It starts
233 // `Ready`; `start()`/`resume_*` flip it to `Running`, suspends set `Suspended`, and
234 // `terminate()` sets `Done`. `phase` is now only the intra-turn step.
235 tasks.insert(Tcb::root("root", policy.clone()));
236 Self {
237 // Inert placeholder step; meaningful only while the root task is `Running`.
238 phase: LoopPhase::Reason,
239 turn: 0,
240 ctx: ContextManager::new(policy.max_tokens),
241 tools: Vec::new(),
242 observations: Vec::new(),
243 policy,
244 total_tokens: 0,
245 group_tokens_base: 0,
246 group_spawns_base: 0,
247 pending_termination: None,
248 session_history_baseline: 0,
249 checkpoint: TurnCheckpoint::default(),
250 milestone: MilestoneTracker::new(),
251 run_spec: None,
252 tasks,
253 governance: None,
254 resource_quota: None,
255 memory_write_times: Vec::new(),
256 memory_policy: None,
257 signal_router: Some(SignalRouter::new(64)),
258 started_at_ms: None,
259 last_now_ms: None,
260 suspend_state: None,
261 pending_denied_results: Vec::new(),
262 workflow: None,
263 }
264 }
265
266 /// The authoritative schedulability lifecycle of the loop (root task state). Replaces the
267 /// removed `LoopPhase::{Idle,Suspended,Blocked,Terminal}` reads.
268 pub fn lifecycle(&self) -> TaskState {
269 self.tasks.get("root").map(|t| t.state).unwrap_or(TaskState::Ready)
270 }
271
272 /// The wait reason while suspended/blocked, if any.
273 pub fn wait_reason(&self) -> Option<WaitReason> {
274 self.tasks.get("root").and_then(|t| t.wait.clone())
275 }
276
277 /// Whether the loop has terminated.
278 pub fn is_terminal(&self) -> bool {
279 matches!(self.lifecycle(), TaskState::Done(_))
280 }
281
282 /// Whether the loop is suspended awaiting external resolution.
283 pub fn is_suspended(&self) -> bool {
284 matches!(self.lifecycle(), TaskState::Suspended)
285 }
286
287 /// Set the root task's lifecycle (and wait reason). Single mutation point for schedulability.
288 fn set_lifecycle(&mut self, state: TaskState, wait: Option<WaitReason>) {
289 if let Some(root) = self.tasks.get_mut("root") {
290 root.state = state;
291 root.wait = wait;
292 } else {
293 let mut root = Tcb::root("root", self.policy.clone());
294 root.state = state;
295 root.wait = wait;
296 self.tasks.insert(root);
297 }
298 }
299
300 /// Build a transient root [`Tcb`] mirroring the current scheduling facts (budget counters,
301 /// wall-clock anchors, lifecycle). M1b uses this to run the pure `schedule()` spine in
302 /// parallel with the legacy budget path; later milestones promote it to the live task row.
303 fn root_tcb(&self) -> Tcb {
304 let mut tcb = Tcb::root("root", self.policy.clone());
305 tcb.budget.turns = self.turn;
306 // L1: the token-budget axis is evaluated against the whole governance domain's cumulative
307 // spend (this vehicle's `total_tokens` plus other members' `group_tokens_base`).
308 tcb.budget.total_tokens = self.total_tokens.saturating_add(self.group_tokens_base);
309 tcb.budget.started_at_ms = self.started_at_ms;
310 tcb.state = self.lifecycle();
311 tcb
312 }
313
314 /// Adjust the wall-clock budget axis at runtime.
315 pub fn set_wall_budget(&mut self, max_wall_ms: Option<u64>) {
316 self.policy.max_wall_ms = max_wall_ms;
317 }
318
319 /// Install a governance pipeline. Once set, all model-proposed tool calls
320 /// are evaluated before execution. Denied/rate-limited calls roll the turn
321 /// back (reusing the `GovernanceDenied` path); `AskUser` calls surface a
322 /// `ToolGated` observation for the SDK to enforce.
323 pub fn set_governance(&mut self, pipeline: GovernancePipeline) {
324 self.governance = Some(pipeline);
325 }
326
327 /// Install resource quotas (M2). Once set, `Spawn` and `WriteMemory` syscalls are bounded by
328 /// the quota at the trap. Not setting it (the default) leaves them unconditionally allowed.
329 pub fn set_resource_quota(&mut self, quota: crate::governance::quota::ResourceQuota) {
330 self.resource_quota = Some(quota);
331 }
332
333 /// L1 (RunGroup): seed the cumulative tokens already spent by other members of this run's
334 /// governance domain. The run-level token cap is then enforced against the group total. Seeding
335 /// 0 (the default) preserves pre-L1 per-vehicle behavior.
336 pub fn seed_group_budget(&mut self, tokens_spent: u64) {
337 self.group_tokens_base = tokens_spent;
338 }
339
340 /// L1 (RunGroup): seed the sub-agents already spawned by other members of this run's governance
341 /// domain. `max_total_subagents` is then enforced against the group total. 0 ⇒ pre-L1 behavior.
342 pub fn seed_group_spawns(&mut self, subagents_spawned: u32) {
343 self.group_spawns_base = subagents_spawned;
344 }
345
346 /// L1: this vehicle's cumulative sub-agent spawns this run — every child task ever registered in
347 /// the `TaskTable` (running + completed), distinct from the *instantaneous* running count. Used
348 /// for the cumulative spawn quota and read back by the SDK to charge the group ledger at run end.
349 pub fn local_subagents_spawned(&self) -> u32 {
350 self.tasks.all().iter().filter(|t| t.proc.is_some()).count() as u32
351 }
352
353 /// Install the long-term memory policy (`set_memory_policy`). Once set it gates `write_memory`
354 /// validation and bounds `query_memory` retrieval breadth. Not setting it (the default)
355 /// preserves pre-policy behavior.
356 pub fn set_memory_policy(&mut self, policy: crate::mm::memory::MemoryPolicy) {
357 self.memory_policy = Some(policy);
358 }
359
360 /// The installed memory policy, if any. `None` means default-rule validation + verbatim top_k.
361 pub fn memory_policy(&self) -> Option<&crate::mm::memory::MemoryPolicy> {
362 self.memory_policy.as_ref()
363 }
364
365 /// Feed the current wall-clock time (ms) to scheduler/governance budget axes.
366 pub fn set_observed_time(&mut self, now_ms: u64) {
367 if self.started_at_ms.is_none() {
368 self.started_at_ms = Some(now_ms);
369 }
370 self.last_now_ms = Some(now_ms);
371 if let Some(pipeline) = self.governance.as_mut() {
372 pipeline.set_time(now_ms);
373 }
374 }
375
376 /// Pre-populate the history partition with messages from a prior session.
377 ///
378 /// Call **before** `start()` when resuming a conversation. Sets the baseline
379 /// so `drain_new_messages()` returns only the messages from the current run.
380 pub fn preload_history(&mut self, messages: Vec<Message>) {
381 for msg in messages {
382 let tokens = self.message_tokens(&msg);
383 self.ctx.push_history(msg, tokens);
384 }
385 self.session_history_baseline = self.ctx.partitions.history.messages.len();
386 }
387
388 /// Continue from preloaded history without appending a new user turn.
389 /// Use after `preload_history` when recovering a session that ended mid-run.
390 ///
391 /// If the last assistant turn has tool calls without matching tool results,
392 /// resumes with `ExecuteTools` instead of calling the LLM again.
393 pub fn resume_after_preload(&mut self) -> LoopAction {
394 self.observations.clear();
395 let calls = crate::runtime::repair::pending_tool_calls_from_messages(
396 &self.ctx.partitions.history.messages,
397 );
398 if !calls.is_empty() {
399 self.emit_page_in_requested(&calls);
400 self.phase = LoopPhase::Act {
401 tool_calls: calls.clone(),
402 };
403 self.set_lifecycle(TaskState::Running, None);
404 return LoopAction::ExecuteTools { calls };
405 }
406 self.phase = LoopPhase::Reason;
407 self.emit_call_llm()
408 }
409
410 /// Return all messages added to history during the current run
411 /// (since the last `preload_history` call or since construction).
412 ///
413 /// Call after `LoopAction::Done` to get the complete turn transcript
414 /// for persistence to a SessionStore.
415 pub fn drain_new_messages(&self) -> Vec<Message> {
416 let history = &self.ctx.partitions.history.messages;
417 let start = self.session_history_baseline.min(history.len());
418 history[start..].to_vec()
419 }
420
421 pub fn start(&mut self, task: RuntimeTask) -> LoopAction {
422 self.observations.clear();
423 self.ctx.init_task(task.goal.clone(), task.criteria.clone());
424
425 let user_msg = "Proceed with the task described in [TASK STATE].".to_string();
426
427 // User message goes into history so it appears at the correct chronological
428 // position: [prior turns...] → [current user message] — LLM reads left-to-right
429 // and responds to the last message. working is reserved for runtime signals only.
430 // Estimate tokens (1 token ≈ 4 chars) with a minimum of 1 so the renderer
431 // does not skip this message (it skips zero-token entries).
432 let user_tokens = self.ctx.engine.count(&user_msg).max(1);
433 self.ctx.push_history(Message::user(user_msg), user_tokens);
434 self.phase = LoopPhase::Reason;
435 // Root task (seeded `Ready` in `new()`) becomes `Running`; `emit_call_llm` sets it.
436 self.emit_call_llm()
437 }
438
439 pub fn feed(&mut self, event: LoopEvent) -> LoopAction {
440 self.observations.clear();
441 self.sweep_expired_leases();
442
443 match event {
444 LoopEvent::Start { task } => self.start(task),
445
446 LoopEvent::LLMResponse { message } => {
447 let tokens = self.message_tokens(&message);
448 self.total_tokens += tokens as u64;
449
450 if let Some(reason) = self.pending_termination.take() {
451 return self.terminate(reason, Some(message));
452 }
453
454 if message.tool_calls.is_empty() {
455 // When a milestone contract is active and not yet complete,
456 // request evaluation instead of terminating.
457 if !self.milestone.is_complete() {
458 let phase_id = self.milestone.current_phase_id().unwrap_or("").to_string();
459 let criteria = self.milestone.current_criteria().to_vec();
460 let (verifier, required_evidence) = self
461 .milestone
462 .contract
463 .as_ref()
464 .and_then(|c| c.phases.get(self.milestone.current_phase))
465 .map(|p| (p.verifier.clone(), p.required_evidence.clone()))
466 .unwrap_or_default();
467 // `tokens` was already computed for this message above.
468 self.ctx.push_history(message, tokens);
469 return LoopAction::EvaluateMilestone {
470 phase_id,
471 criteria,
472 verifier,
473 required_evidence,
474 };
475 }
476 return self.terminate(TerminationReason::Completed, Some(message));
477 }
478
479 let calls = message.tool_calls.clone();
480 self.ctx.push_history(message, tokens);
481
482 // ━━ 记录活动时间(Layer 3时间衰减使用)
483 if let Some(now_ms) = self.last_now_ms {
484 self.ctx.record_activity(now_ms);
485 }
486
487 // 2b: record this turn's tool activity into the task-state recency log (meta-tools
488 // filtered inside). The State-turn footer renders it as "just did: …" + a forward
489 // nudge / STOP, so progress is kernel-derived and never depends on the model
490 // remembering to call `update_plan`. Tool *names* live only on the request (results
491 // carry call_id only), so this is the turn to capture them.
492 let action_names: Vec<String> =
493 calls.iter().map(|c| c.name.to_string()).collect();
494 self.ctx.note_tool_actions(&action_names);
495
496 match self.gate_tool_calls(&calls) {
497 GateToolOutcome::Blocked(action) => return action,
498 GateToolOutcome::Suspended => return LoopAction::AwaitingResume,
499 GateToolOutcome::Proceed => {}
500 }
501 self.emit_page_in_requested(&calls);
502 self.phase = LoopPhase::Act {
503 tool_calls: calls.clone(),
504 };
505 self.set_lifecycle(TaskState::Running, None);
506 LoopAction::ExecuteTools { calls }
507 }
508
509 LoopEvent::ToolResults { mut results } => {
510 if !self.pending_denied_results.is_empty() {
511 results.append(&mut self.pending_denied_results);
512 }
513 if let Some(reason) = results
514 .iter()
515 .find_map(|result| self.rollback_reason_for_tool_result(result))
516 {
517 let note = Message::user(super::rollback::build_rollback_note(
518 &reason,
519 self.ctx.config.verbose_control_notes,
520 ));
521 self.rollback(reason);
522 self.ctx.push_signal(note.content.as_text().unwrap_or_default().to_string());
523 self.phase = LoopPhase::Reason;
524 return self.emit_call_llm();
525 }
526 // Non-fatal errors are committed to history so the LLM can
527 // see them and self-correct without losing turn state.
528
529 for r in &results {
530 self.total_tokens += r.token_count.unwrap_or(0) as u64;
531 // Preserve Content::Parts (structured / multimodal tool output).
532 // Parts are serialised to JSON so the text can be restored faithfully.
533 let raw_output = match &r.output {
534 Content::Text(s) => s.clone(),
535 Content::Parts(parts) => serde_json::to_string(parts).unwrap_or_default(),
536 };
537 // Layer 1 spool: oversized results keep only a preview in context; the kernel
538 // emits `LargeResultSpooled` so the SDK persists the full output it still holds.
539 let (output, spooled) = match crate::mm::plan_spool(
540 &raw_output,
541 self.ctx.config.spool_threshold_bytes,
542 self.ctx.config.spool_preview_bytes,
543 ) {
544 Some(decision) => {
545 self.observations.push(KernelObservation::LargeResultSpooled {
546 turn: self.turn,
547 call_id: r.call_id.to_string(),
548 // ToolResult carries no tool name; the SDK maps call_id -> tool.
549 tool: String::new(),
550 original_size: decision.original_size,
551 preview_size: decision.preview.len() as u32,
552 spool_ref: None,
553 });
554 (decision.preview, true)
555 }
556 None => (raw_output, false),
557 };
558 let parts = vec![ContentPart::ToolResult {
559 call_id: r.call_id.clone(),
560 output,
561 is_error: r.is_error,
562 }];
563 let tool_msg = Message::tool(parts);
564 // When spooled, `r.token_count` reflects the full output — recount the preview.
565 let tokens = if spooled {
566 self.ctx.engine.count_message(&tool_msg)
567 } else {
568 r.token_count
569 .unwrap_or_else(|| self.ctx.engine.count_message(&tool_msg))
570 };
571 self.ctx.push_history(tool_msg, tokens);
572 // Layer 1: a spooled result's handle is marked SpooledOut (its full output now
573 // lives on disk via the SDK); the SDK maps call_id -> the persisted ref.
574 if spooled {
575 self.ctx.mark_spooled(&r.call_id, r.call_id.to_string());
576 }
577 }
578 self.turn += 1;
579
580 // M1 收口: the pure `schedule()` is now the single budget decision point.
581 // It evaluates the same three axes (turn/token/wall) via `BudgetLedger`, which
582 // delegates to `SchedulerBudget::should_terminate` internally — one source of truth.
583 if let ScheduleDecision::Terminate { reason: term, .. } =
584 super::tcb::schedule(&self.root_tcb(), self.last_now_ms)
585 {
586 let budget = match term {
587 TerminationReason::MaxTurns => "max_turns",
588 TerminationReason::Timeout => "wall_time",
589 _ => "token_budget",
590 };
591 self.observations.push(KernelObservation::BudgetExceeded {
592 turn: self.turn,
593 budget: budget.to_string(),
594 });
595 self.pending_termination = Some(term);
596 self.phase = LoopPhase::Reason;
597 return self.emit_call_llm();
598 }
599
600 // ━━ Eviction checkpoint (M3): one decision model (`plan_eviction`), one
601 // execution funnel (`execute_eviction_op`). Layer 3 (idle/time-decay) must run
602 // before the rho recommendation is read, since it mutates token usage — so the
603 // plan is built in that interleaved order and the ops are executed in plan order.
604 let idle_decay = self
605 .last_now_ms
606 .is_some_and(|now_ms| self.ctx.should_time_decay_compact(now_ms));
607 if idle_decay {
608 self.execute_eviction_op(&crate::mm::EvictionOp::TimeDecayMicro);
609 }
610
611 // Layer 4 read-time projection: recompute handle residency on the post-time-decay rho.
612 self.ctx.recompute_handle_residency();
613 self.phase = LoopPhase::Delta {
614 pressure: self.ctx.rho(),
615 };
616
617 // Layers 2/4/5: execute the pressure-driven ops from the plan (skip TimeDecayMicro
618 // if already executed). The plan carries specific ops stamped with real config-derived
619 // params (W1-1 収口 — no magic-number placeholders), not the umbrella `Pressure` wrapper.
620 let (target_tokens, preserve_turns) = self.ctx.plan_compaction_params();
621 let plan =
622 crate::mm::plan_eviction(self.ctx.should_compress(), idle_decay, target_tokens, preserve_turns);
623 // `idle_decay` ⇒ the plan carries a `TimeDecayMicro` (so the skip-on-already-executed
624 // below is meaningful). The converse does NOT hold: a pressure-driven `MicroCompact`
625 // also emits `TimeDecayMicro` independent of `idle_decay` (W1 unified planner), so we
626 // assert the implication, not equality.
627 debug_assert!(!idle_decay || plan.has_time_decay());
628 for op in &plan.ops {
629 // Skip TimeDecayMicro if we already executed it (prevents double-execution).
630 if matches!(op, crate::mm::EvictionOp::TimeDecayMicro) && idle_decay {
631 continue;
632 }
633 self.execute_eviction_op(op);
634 }
635
636 // Renewal: when compression alone cannot recover enough headroom,
637 // start a new sprint — carry forward system + memory + last N history turns.
638 if self.ctx.should_renew() {
639 self.ctx.renew();
640 // A new sprint is a session boundary for signal identity: clear the dedup set so
641 // it cannot grow unbounded across a long run, and so a signal seen in a prior
642 // sprint may legitimately re-fire in the new one.
643 if let Some(router) = self.signal_router.as_mut() {
644 router.clear_dedup();
645 }
646 self.observations.push(KernelObservation::Renewed {
647 sprint: self.ctx.sprint,
648 });
649 }
650
651 // Turn boundary: drain any kernel-queued signals into context so they
652 // are seen on the next reasoning turn (ready queue → running).
653 self.drain_queued_signals();
654
655 self.phase = LoopPhase::Reason;
656 self.emit_call_llm()
657 }
658
659 LoopEvent::Signal { signal } => {
660 // `feed` always returns an action; non-actionable dispositions
661 // (queue/observe/ignore) fall back to a plain provider call here.
662 // The kernel-routed path (`dispatch_signal`) is driven via the ABI.
663 self.dispatch_signal(signal)
664 .unwrap_or_else(|| self.emit_call_llm())
665 }
666
667 LoopEvent::MilestoneResult { result } => self.handle_milestone_result(result),
668
669 LoopEvent::SubAgentCompleted { result } => self.handle_sub_agent_completed(result),
670
671 LoopEvent::Timeout => {
672 let reason = RollbackReason::Timeout;
673 let note = Message::user(super::rollback::build_rollback_note(
674 &reason,
675 self.ctx.config.verbose_control_notes,
676 ));
677 self.rollback(reason);
678 self.ctx.push_signal(note.content.as_text().unwrap_or_default().to_string());
679 self.phase = LoopPhase::Reason;
680 self.emit_call_llm()
681 }
682 }
683 }
684
685
686 /// Drain observations emitted during the last `start`/`feed` call.
687 pub fn take_observations(&mut self) -> Vec<KernelObservation> {
688 std::mem::take(&mut self.observations)
689 }
690
691 /// W2-2: Create a snapshot of the current kernel state for crash recovery or migration.
692 pub fn snapshot(&self) -> crate::runtime::snapshot::KernelSnapshot {
693 use crate::runtime::snapshot::{ContextSnapshot, KernelSnapshot};
694 let context = ContextSnapshot::from_context(&self.ctx);
695 KernelSnapshot::from_state(
696 self.turn,
697 self.total_tokens,
698 &self.tasks,
699 &context,
700 self.run_spec.as_ref(),
701 )
702 }
703
704 /// W2-2: Restore kernel state from a snapshot. Returns a new LoopStateMachine rebuilt from the snapshot.
705 /// Note: This is a foundational restore - some state (governance, milestone, signal router dedup) is
706 /// recreated from policy/config rather than serialized, following the principle that strategy is data.
707 pub fn restore(snap: &crate::runtime::snapshot::KernelSnapshot) -> Self {
708 use crate::signals::router::SignalRouter;
709
710 // Reconstruct policy from the max_tokens in snapshot
711 let policy = crate::scheduler::policy::LoopPolicy {
712 max_tokens: snap.context.max_tokens,
713 ..Default::default()
714 };
715
716 // Rebuild TaskTable from snapshot TCBs
717 let mut tasks = TaskTable::new();
718 for tcb_snap in &snap.tasks {
719 if let Some(tcb) = snap.restore_tcb(tcb_snap) {
720 tasks.insert(tcb);
721 }
722 }
723
724 // Rebuild context partitions from snapshot
725 let mut ctx = ContextManager::new(snap.context.max_tokens);
726 ctx.sprint = snap.context.sprint;
727
728 // Restore messages
729 for msg in &snap.context.system_messages {
730 let tokens = ctx.engine.count_message(msg);
731 ctx.partitions.system.push(msg.clone(), tokens);
732 }
733 for msg in &snap.context.knowledge_messages {
734 let tokens = ctx.engine.count_message(msg);
735 ctx.partitions.knowledge.push(msg.clone(), tokens);
736 }
737 for msg in &snap.context.history_messages {
738 let tokens = ctx.engine.count_message(msg);
739 ctx.partitions.history.push(msg.clone(), tokens);
740 }
741
742 // Restore task state
743 if let Some(goal) = &snap.context.task_goal {
744 ctx.partitions.task_state.goal = goal.clone();
745 }
746 if let Some(plan_json) = &snap.context.task_plan {
747 if let Ok(plan_steps) = serde_json::from_str::<Vec<crate::context::task_state::PlanStep>>(plan_json) {
748 ctx.partitions.task_state.plan = plan_steps;
749 }
750 }
751 if let Some(progress) = &snap.context.task_progress {
752 ctx.partitions.task_state.progress = progress.clone();
753 }
754 ctx.partitions.task_state.directives = snap.context.task_directives.clone();
755
756 // Restore signals
757 ctx.partitions.signals = snap.context.signals.clone();
758
759 Self {
760 phase: LoopPhase::Reason,
761 turn: snap.turn,
762 ctx,
763 tools: Vec::new(), // Tools are rebuilt from capabilities on next LLM call
764 observations: Vec::new(),
765 policy,
766 total_tokens: snap.total_tokens,
767 // Re-seeded from the replayed `ConfigureRun` (strategy is data, not serialized state).
768 group_tokens_base: 0,
769 group_spawns_base: 0,
770 pending_termination: None,
771 session_history_baseline: 0,
772 checkpoint: TurnCheckpoint::default(),
773 milestone: crate::scheduler::milestone::MilestoneTracker::new(),
774 run_spec: snap.run_spec(),
775 tasks,
776 governance: None, // Governance is policy data, recreated from config
777 resource_quota: None,
778 memory_write_times: Vec::new(),
779 memory_policy: None,
780 signal_router: Some(SignalRouter::new(64)), // Dedup cleared on restore
781 started_at_ms: None,
782 last_now_ms: None,
783 suspend_state: None,
784 pending_denied_results: Vec::new(),
785 workflow: None,
786 }
787 }
788
789 fn terminate(
790 &mut self,
791 termination: TerminationReason,
792 final_message: Option<Message>,
793 ) -> LoopAction {
794 // Commit the final response into history so subsequent session restores
795 // include the complete transcript: user → [tool turns] → final assistant.
796 if let Some(ref msg) = final_message {
797 let tokens = self.message_tokens(msg);
798 self.ctx.push_history(msg.clone(), tokens);
799 }
800 let result = LoopResult {
801 termination,
802 final_message,
803 turns_used: self.turn,
804 total_tokens_used: self.total_tokens,
805 loop_continue: None,
806 classify_branch: None,
807 tournament_winner: None,
808 };
809 self.set_lifecycle(TaskState::Done(termination), None);
810 LoopAction::Done { result }
811 }
812
813 /// Build the `CallLLM` action with a structured `RenderedContext`.
814 /// Meta-tools (skill / memory / knowledge) are appended to the tool list
815 /// when configured. When `pending_termination` is set, tools are stripped
816 /// to force a plain-text response before the loop terminates.
817 fn emit_call_llm(&mut self) -> LoopAction {
818 // Calling the provider is definitionally "running" — the single funnel for entering the
819 // Running lifecycle (covers start, resume, signal-driven turns, budget final-call).
820 self.set_lifecycle(TaskState::Running, None);
821 self.checkpoint.history_len = self.ctx.partitions.history.messages.len();
822 self.checkpoint.signals_len = self.ctx.partitions.signals.len();
823 self.checkpoint.task_state = Some(self.ctx.partitions.task_state.clone());
824 self.observations.push(KernelObservation::CheckpointTaken {
825 turn: self.turn,
826 history_len: self.checkpoint.history_len as u32,
827 });
828
829 let context = self.ctx.render();
830 if self.pending_termination.is_some() {
831 return LoopAction::CallLLM {
832 context,
833 tools: Vec::new(),
834 };
835 }
836 let mut tools = self.tools.clone();
837 tools.extend(self.ctx.meta_tool_schemas());
838
839 if let Some(ref spec) = self.run_spec {
840 use crate::types::capability::CapabilityKind;
841 tools.retain(|tool| {
842 let kind = match tool.name.as_str() {
843 "skill" => CapabilityKind::Skill,
844 "memory" => CapabilityKind::Memory,
845 "knowledge" => CapabilityKind::Knowledge,
846 _ => CapabilityKind::Tool,
847 };
848 let desc = crate::types::capability::CapabilityDescriptor::marker(
849 kind,
850 tool.name.clone(),
851 &tool.description,
852 );
853 spec.capability_filter.allows(&desc)
854 });
855 }
856
857 // P1-B epoch skill gating (applied *after* the run-level filter ③, so A is the outer bound
858 // and B narrows within it — D6). When skills are active and declare tools, expose only
859 // `meta-tools ∪ stable-core ∪ ⋃(active skills' allowed_tools)`. `None` ⇒ no active/declared
860 // skill ⇒ no narrowing (D3, errs-open). Meta-tools are always exempt (D5) so the model can
861 // still load more skills. Byte-stable within an epoch: the set only changes on activation.
862 if let Some(allowed) = self.ctx.active_skill_tool_filter() {
863 let stable = &self.ctx.stable_core_tools;
864 tools.retain(|tool| {
865 matches!(tool.name.as_str(), "skill" | "memory" | "knowledge" | "update_plan")
866 || stable.contains(&tool.name)
867 || allowed.contains(&tool.name)
868 });
869 }
870
871 LoopAction::CallLLM { context, tools }
872 }
873
874 pub fn rollback(&mut self, reason: RollbackReason) {
875 self.ctx.partitions.history.messages.truncate(self.checkpoint.history_len);
876 self.ctx.partitions.signals.truncate(self.checkpoint.signals_len);
877 if let Some(ref state) = self.checkpoint.task_state {
878 self.ctx.partitions.task_state = state.clone();
879 }
880 self.observations.push(KernelObservation::Rollbacked {
881 turn: self.turn,
882 checkpoint_history_len: self.checkpoint.history_len as u32,
883 reason: Some(reason),
884 });
885 }
886
887 fn rollback_reason_for_tool_result(&self, result: &ToolResult) -> Option<RollbackReason> {
888 let tool_name = self.tool_name_for_call(&result.call_id);
889 let output = super::rollback::tool_result_output_text(result);
890
891 if result.is_fatal {
892 return Some(RollbackReason::FatalToolError {
893 tool_name,
894 error: output,
895 });
896 }
897
898 match result.error_kind {
899 Some(ToolErrorKind::Fatal) => Some(RollbackReason::FatalToolError {
900 tool_name,
901 error: output,
902 }),
903 Some(ToolErrorKind::GovernanceDenied) => Some(RollbackReason::GovernanceDenied {
904 tool_name,
905 reason: output,
906 }),
907 Some(ToolErrorKind::ProviderFailure) => {
908 Some(RollbackReason::ProviderFailure { error: output })
909 }
910 Some(ToolErrorKind::Timeout) => Some(RollbackReason::Timeout),
911 Some(ToolErrorKind::UserInterrupt) => Some(RollbackReason::UserInterrupt),
912 Some(ToolErrorKind::Recoverable) | None => None,
913 }
914 }
915
916 fn tool_name_for_call(&self, call_id: &compact_str::CompactString) -> String {
917 match &self.phase {
918 LoopPhase::Act { tool_calls } => tool_calls
919 .iter()
920 .find(|call| call.id == *call_id)
921 .map(|call| call.name.to_string())
922 .unwrap_or_else(|| call_id.to_string()),
923 _ => call_id.to_string(),
924 }
925 }
926}
927
928#[cfg(test)]
929#[path = "tests.rs"]
930mod tests;