Skip to main content

deepstrike_core/scheduler/state_machine/
mod.rs

1use std::collections::HashMap;
2
3use super::milestone::MilestoneTracker;
4use super::policy::LoopPolicy;
5use super::tcb::{ScheduleDecision, TaskState, TaskTable, Tcb, WaitReason};
6use crate::AgentRunSpec;
7use crate::context::manager::ContextManager;
8use crate::governance::pipeline::GovernancePipeline;
9use crate::signals::router::SignalRouter;
10use crate::types::result::SubAgentResult;
11use crate::context::renderer::RenderedContext;
12// `pub use` so external integration tests that glob `state_machine::*` resolve the observation
13// type here — exactly as they did for the former `pub enum LoopObservation` this replaced.
14pub use crate::runtime::kernel::KernelObservation;
15use crate::runtime::session::RollbackReason;
16use crate::types::message::{
17    Content, ContentPart, Message, ToolCall, ToolErrorKind, ToolResult, ToolSchema,
18};
19use crate::types::milestone::MilestoneCheckResult;
20use crate::types::result::{LoopResult, TerminationReason};
21use crate::types::signal::RuntimeSignal;
22use crate::types::task::RuntimeTask;
23
24/// Compact digest of a tool call's arguments for the recency log (2b). Kept short and CJK-safe — it
25/// only needs to make `same-tool / different-args` calls distinguishable (so a legit loop isn't
26/// flagged as a no-progress repeat) and to read sensibly in the "just did: …" footer. Empty for
27/// no-arg / `{}` calls. Lives in the volatile State turn, so length here never churns the cache.
28fn compact_tool_args(args: &serde_json::Value) -> String {
29    if args.is_null() {
30        return String::new();
31    }
32    let s = args.to_string();
33    if s == "{}" {
34        return String::new();
35    }
36    const MAX: usize = 48;
37    if s.chars().count() <= MAX {
38        s
39    } else {
40        format!("{}…", s.chars().take(MAX).collect::<String>())
41    }
42}
43
44/// The *turn step* of the L* execution loop (M1d).
45///
46/// Schedulability (`Ready/Running/Blocked/Suspended/Done`) is no longer carried here — it lives
47/// on the root task's [`TaskState`] in the kernel's `TaskTable`, queried via
48/// [`LoopStateMachine::lifecycle`]. `LoopPhase` is now orthogonal: it only records *which step of a
49/// running turn* the loop is in. When the task is `Ready/Suspended/Done`, the phase value is
50/// inert (left at its last step) and ignored.
51#[derive(Debug, Clone)]
52pub enum LoopPhase {
53    Reason,
54    Act { tool_calls: Vec<ToolCall> },
55    Observe { results: Vec<ToolResult> },
56    Delta { pressure: f64 },
57}
58
59/// Why the loop entered `Suspended` state.
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum SuspendReason {
62    /// Governance `AskUser` — waiting for SDK to resolve human approval.
63    AskUser,
64    /// Sub-agent spawned — waiting for sub-agent to complete.
65    SubAgentAwait,
66    /// Externally requested suspension.
67    External,
68}
69
70/// What the loop is blocked waiting for.
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum BlockReason {
73    /// Awaiting a tool's continuation (tool suspend pattern).
74    ToolSuspend,
75    /// Awaiting milestone evaluation result.
76    MilestoneAwait,
77}
78
79/// Events fed into the state machine from the SDK layer.
80#[derive(Debug)]
81pub enum LoopEvent {
82    Start {
83        task: RuntimeTask,
84    },
85    LLMResponse {
86        message: Message,
87    },
88    ToolResults {
89        results: Vec<ToolResult>,
90    },
91    /// Inbound signal from SignalRouter — Critical/High urgency may interrupt.
92    Signal {
93        signal: RuntimeSignal,
94    },
95    /// Result of evaluating the current milestone phase's criteria.
96    /// Feed this back after handling `LoopAction::EvaluateMilestone`.
97    MilestoneResult {
98        result: MilestoneCheckResult,
99    },
100    /// Sub-agent run completed — result is injected into the loop as context.
101    SubAgentCompleted {
102        result: SubAgentResult,
103    },
104    Timeout,
105}
106
107/// Actions the state machine outputs — SDK layer executes the I/O.
108#[derive(Debug)]
109pub enum LoopAction {
110    /// Structured context ready for a provider call.
111    /// `context.system_text` → provider system param.
112    /// `context.turns`       → provider messages array (strictly alternating).
113    /// `tools`               → tool schemas (skill / memory / knowledge / user tools).
114    CallLLM {
115        context: RenderedContext,
116        tools: Vec<ToolSchema>,
117    },
118    ExecuteTools {
119        calls: Vec<ToolCall>,
120    },
121    Done {
122        result: LoopResult,
123    },
124    /// Kernel requests the SDK to evaluate the current milestone phase.
125    ///
126    /// The SDK should assess `criteria` against the agent's output using the
127    /// specified `verifier`, then feed back `LoopEvent::MilestoneResult { result }`.
128    EvaluateMilestone {
129        phase_id: String,
130        criteria: Vec<String>,
131        verifier: Option<crate::types::milestone::MilestoneVerifier>,
132        required_evidence: Vec<String>,
133    },
134    /// Kernel is suspended — SDK must resolve (e.g. human approval) and feed `Resume`.
135    AwaitingResume,
136}
137
138/// Payload held while the loop is in `Suspended`.
139#[derive(Debug, Clone)]
140pub(super) enum SuspendState {
141    /// Governance AskUser — awaiting `Resume { approved_calls, denied_calls }`.
142    AskUser {
143        calls: Vec<ToolCall>,
144        gated_reasons: HashMap<String, String>,
145    },
146    /// Sub-agent spawn — awaiting `SubAgentCompleted` for each listed agent id.
147    SubAgentAwait {
148        agent_ids: Vec<String>,
149    },
150}
151
152pub(super) enum GateToolOutcome {
153    Proceed,
154    Blocked(LoopAction),
155    Suspended,
156}
157
158/// Snapshot of context lengths captured just before each LLM call.
159/// Used internally to restore state on rollback.
160#[derive(Debug, Clone, Default)]
161pub struct TurnCheckpoint {
162    pub history_len: usize,
163    pub signals_len: usize,
164    pub task_state: Option<crate::context::task_state::TaskState>,
165}
166
167/// Pure state machine for the L* execution loop. No I/O — only state transitions.
168///
169/// Internal engine backing [`crate::runtime::KernelRuntime`]. Exposed for in-crate
170/// use and tests; external callers should drive the kernel through `KernelRuntime`.
171#[doc(hidden)]
172pub struct LoopStateMachine {
173    pub phase: LoopPhase,
174    pub turn: u32,
175    pub ctx: ContextManager,
176    pub tools: Vec<ToolSchema>,
177    pub observations: Vec<KernelObservation>,
178    pub(super) policy: LoopPolicy,
179    pub(super) total_tokens: u64,
180    /// L1 (RunGroup): cumulative tokens spent by *other* members of this run's governance domain,
181    /// seeded at boot via `seed_group_budget`. The run-level token cap is enforced against
182    /// `group_tokens_base + total_tokens` so the budget spans the whole group, not one vehicle.
183    /// 0 (default) ⇒ no group (N=1) ⇒ pre-L1 per-kernel behavior (byte-identical).
184    pub(super) group_tokens_base: u64,
185    /// L1 (RunGroup): sub-agents spawned by *other* members of this run's governance domain, seeded
186    /// at boot. `max_total_subagents` is enforced against `group_spawns_base + local spawns`. 0 ⇒ N=1.
187    pub(super) group_spawns_base: u32,
188    /// When set, the next LLM call strips tools to force a text response,
189    /// then terminates with this reason once the response arrives.
190    pub(super) pending_termination: Option<TerminationReason>,
191    /// Number of history messages present at session start (after preload_history).
192    /// drain_new_messages() returns the slice from this offset onward.
193    pub(super) session_history_baseline: usize,
194    pub(super) checkpoint: TurnCheckpoint,
195    /// Milestone contract tracker (extracted to reduce state machine bloat).
196    pub(super) milestone: MilestoneTracker,
197    pub run_spec: Option<AgentRunSpec>,
198    /// M1 収口: the single source of truth for schedulability *and* sub-agent lineage. Root is
199    /// task `"root"`; each sub-agent is a child task carrying its `ProcInfo`. The former
200    /// `ProcessTable` is now a derived view over this (`agent_process(es)` rebuild `AgentProcess`
201    /// rows on demand via `AgentProcess::from_tcb`).
202    pub(super) tasks: TaskTable,
203    /// Optional governance pipeline. When set, every tool call proposed by the
204    /// model is evaluated before `ExecuteTools` is emitted. `None` (default)
205    /// skips the gate entirely, preserving the pre-governance behavior.
206    pub(super) governance: Option<GovernancePipeline>,
207    /// Optional resource quota evaluated at the syscall trap (M2). `None` (default) leaves spawn /
208    /// memory syscalls unconditionally allowed, preserving pre-M2 behavior.
209    pub(super) resource_quota: Option<crate::governance::quota::ResourceQuota>,
210    /// Timestamps of recent allowed `WriteMemory` syscalls, for the rolling-window rate limit.
211    /// Only populated when `resource_quota.memory_writes_per_window` is set.
212    pub(super) memory_write_times: Vec<u64>,
213    /// Optional long-term memory policy (`set_memory_policy`). `None` (default) preserves
214    /// pre-policy behavior: default-rule validation + verbatim retrieval `top_k`.
215    pub(super) memory_policy: Option<crate::mm::memory::MemoryPolicy>,
216    /// Optional in-kernel signal router. When set, inbound signals are routed
217    /// through dedup + attention policy + queue here (the kernel owns disposition).
218    /// `None` (default) keeps the legacy hardcoded urgency handling in `feed`.
219    pub(super) signal_router: Option<SignalRouter>,
220    /// Wall-clock timestamp of the first `ProviderResult.now_ms` received.
221    /// Used by the wall-time budget axis in `SchedulerBudget::should_terminate`.
222    pub(super) started_at_ms: Option<u64>,
223    /// Most-recent `now_ms` value from `ProviderResult`, forwarded to the budget check.
224    pub(super) last_now_ms: Option<u64>,
225    /// Tool batch awaiting `Resume` after an AskUser suspend.
226    pub(super) suspend_state: Option<SuspendState>,
227    /// Denied tool results to merge into the next `ToolResults` feed after resume.
228    pub(super) pending_denied_results: Vec<ToolResult>,
229    /// W0: an in-flight workflow DAG, when one is loaded. The kernel spawns its ready nodes as
230    /// gated batches (each through `evaluate_syscall(Syscall::Spawn)`) and advances on
231    /// completions. `None` (default) preserves the single-spawn `spawn_sub_agent` behavior.
232    pub(super) workflow: Option<crate::orchestration::workflow::WorkflowRun>,
233}
234
235mod signal;
236mod capability;
237mod gate;
238mod eviction;
239mod process;
240mod workflow;
241mod milestone_exec;
242
243impl LoopStateMachine {
244    fn message_tokens(&self, message: &Message) -> u32 {
245        message
246            .token_count
247            .unwrap_or_else(|| self.ctx.engine.count_message(message))
248    }
249
250    pub fn new(policy: LoopPolicy) -> Self {
251        let mut tasks = TaskTable::new();
252        // M1d: the root task carries the authoritative schedulability lifecycle. It starts
253        // `Ready`; `start()`/`resume_*` flip it to `Running`, suspends set `Suspended`, and
254        // `terminate()` sets `Done`. `phase` is now only the intra-turn step.
255        tasks.insert(Tcb::root("root", policy.clone()));
256        Self {
257            // Inert placeholder step; meaningful only while the root task is `Running`.
258            phase: LoopPhase::Reason,
259            turn: 0,
260            ctx: ContextManager::new(policy.max_tokens),
261            tools: Vec::new(),
262            observations: Vec::new(),
263            policy,
264            total_tokens: 0,
265            group_tokens_base: 0,
266            group_spawns_base: 0,
267            pending_termination: None,
268            session_history_baseline: 0,
269            checkpoint: TurnCheckpoint::default(),
270            milestone: MilestoneTracker::new(),
271            run_spec: None,
272            tasks,
273            governance: None,
274            resource_quota: None,
275            memory_write_times: Vec::new(),
276            memory_policy: None,
277            signal_router: Some(SignalRouter::new(64)),
278            started_at_ms: None,
279            last_now_ms: None,
280            suspend_state: None,
281            pending_denied_results: Vec::new(),
282            workflow: None,
283        }
284    }
285
286    /// The authoritative schedulability lifecycle of the loop (root task state). Replaces the
287    /// removed `LoopPhase::{Idle,Suspended,Blocked,Terminal}` reads.
288    pub fn lifecycle(&self) -> TaskState {
289        self.tasks.get("root").map(|t| t.state).unwrap_or(TaskState::Ready)
290    }
291
292    /// The wait reason while suspended/blocked, if any.
293    pub fn wait_reason(&self) -> Option<WaitReason> {
294        self.tasks.get("root").and_then(|t| t.wait.clone())
295    }
296
297    /// Whether the loop has terminated.
298    pub fn is_terminal(&self) -> bool {
299        matches!(self.lifecycle(), TaskState::Done(_))
300    }
301
302    /// Whether the loop is suspended awaiting external resolution.
303    pub fn is_suspended(&self) -> bool {
304        matches!(self.lifecycle(), TaskState::Suspended)
305    }
306
307    /// Set the root task's lifecycle (and wait reason). Single mutation point for schedulability.
308    fn set_lifecycle(&mut self, state: TaskState, wait: Option<WaitReason>) {
309        if let Some(root) = self.tasks.get_mut("root") {
310            root.state = state;
311            root.wait = wait;
312        } else {
313            let mut root = Tcb::root("root", self.policy.clone());
314            root.state = state;
315            root.wait = wait;
316            self.tasks.insert(root);
317        }
318    }
319
320    /// Build a transient root [`Tcb`] mirroring the current scheduling facts (budget counters,
321    /// wall-clock anchors, lifecycle). M1b uses this to run the pure `schedule()` spine in
322    /// parallel with the legacy budget path; later milestones promote it to the live task row.
323    fn root_tcb(&self) -> Tcb {
324        let mut tcb = Tcb::root("root", self.policy.clone());
325        tcb.budget.turns = self.turn;
326        // L1: the token-budget axis is evaluated against the whole governance domain's cumulative
327        // spend (this vehicle's `total_tokens` plus other members' `group_tokens_base`).
328        tcb.budget.total_tokens = self.total_tokens.saturating_add(self.group_tokens_base);
329        tcb.budget.started_at_ms = self.started_at_ms;
330        tcb.state = self.lifecycle();
331        tcb
332    }
333
334    /// Adjust the wall-clock budget axis at runtime.
335    pub fn set_wall_budget(&mut self, max_wall_ms: Option<u64>) {
336        self.policy.max_wall_ms = max_wall_ms;
337    }
338
339    /// Install a governance pipeline. Once set, all model-proposed tool calls
340    /// are evaluated before execution. Denied/rate-limited calls roll the turn
341    /// back (reusing the `GovernanceDenied` path); `AskUser` calls surface a
342    /// `ToolGated` observation for the SDK to enforce.
343    pub fn set_governance(&mut self, pipeline: GovernancePipeline) {
344        self.governance = Some(pipeline);
345    }
346
347    /// Install resource quotas (M2). Once set, `Spawn` and `WriteMemory` syscalls are bounded by
348    /// the quota at the trap. Not setting it (the default) leaves them unconditionally allowed.
349    pub fn set_resource_quota(&mut self, quota: crate::governance::quota::ResourceQuota) {
350        self.resource_quota = Some(quota);
351    }
352
353    /// L1 (RunGroup): seed the cumulative tokens already spent by other members of this run's
354    /// governance domain. The run-level token cap is then enforced against the group total. Seeding
355    /// 0 (the default) preserves pre-L1 per-vehicle behavior.
356    pub fn seed_group_budget(&mut self, tokens_spent: u64) {
357        self.group_tokens_base = tokens_spent;
358    }
359
360    /// L1 (RunGroup): seed the sub-agents already spawned by other members of this run's governance
361    /// domain. `max_total_subagents` is then enforced against the group total. 0 ⇒ pre-L1 behavior.
362    pub fn seed_group_spawns(&mut self, subagents_spawned: u32) {
363        self.group_spawns_base = subagents_spawned;
364    }
365
366    /// L1: this vehicle's cumulative sub-agent spawns this run — every child task ever registered in
367    /// the `TaskTable` (running + completed), distinct from the *instantaneous* running count. Used
368    /// for the cumulative spawn quota and read back by the SDK to charge the group ledger at run end.
369    pub fn local_subagents_spawned(&self) -> u32 {
370        self.tasks.all().iter().filter(|t| t.proc.is_some()).count() as u32
371    }
372
373    /// Install the long-term memory policy (`set_memory_policy`). Once set it gates `write_memory`
374    /// validation and bounds `query_memory` retrieval breadth. Not setting it (the default)
375    /// preserves pre-policy behavior.
376    pub fn set_memory_policy(&mut self, policy: crate::mm::memory::MemoryPolicy) {
377        self.memory_policy = Some(policy);
378    }
379
380    /// The installed memory policy, if any. `None` means default-rule validation + verbatim top_k.
381    pub fn memory_policy(&self) -> Option<&crate::mm::memory::MemoryPolicy> {
382        self.memory_policy.as_ref()
383    }
384
385    /// Feed the current wall-clock time (ms) to scheduler/governance budget axes.
386    pub fn set_observed_time(&mut self, now_ms: u64) {
387        if self.started_at_ms.is_none() {
388            self.started_at_ms = Some(now_ms);
389        }
390        self.last_now_ms = Some(now_ms);
391        if let Some(pipeline) = self.governance.as_mut() {
392            pipeline.set_time(now_ms);
393        }
394    }
395
396    /// Pre-populate the history partition with messages from a prior session.
397    ///
398    /// Call **before** `start()` when resuming a conversation. Sets the baseline
399    /// so `drain_new_messages()` returns only the messages from the current run.
400    pub fn preload_history(&mut self, messages: Vec<Message>) {
401        for msg in messages {
402            let tokens = self.message_tokens(&msg);
403            self.ctx.push_history(msg, tokens);
404        }
405        self.session_history_baseline = self.ctx.partitions.history.messages.len();
406    }
407
408    /// Continue from preloaded history without appending a new user turn.
409    /// Use after `preload_history` when recovering a session that ended mid-run.
410    ///
411    /// If the last assistant turn has tool calls without matching tool results,
412    /// resumes with `ExecuteTools` instead of calling the LLM again.
413    pub fn resume_after_preload(&mut self) -> LoopAction {
414        self.observations.clear();
415        let calls = crate::runtime::repair::pending_tool_calls_from_messages(
416            &self.ctx.partitions.history.messages,
417        );
418        if !calls.is_empty() {
419            self.emit_page_in_requested(&calls);
420            self.phase = LoopPhase::Act {
421                tool_calls: calls.clone(),
422            };
423            self.set_lifecycle(TaskState::Running, None);
424            return LoopAction::ExecuteTools { calls };
425        }
426        self.phase = LoopPhase::Reason;
427        self.emit_call_llm()
428    }
429
430    /// Return all messages added to history during the current run
431    /// (since the last `preload_history` call or since construction).
432    ///
433    /// Call after `LoopAction::Done` to get the complete turn transcript
434    /// for persistence to a SessionStore.
435    pub fn drain_new_messages(&self) -> Vec<Message> {
436        let history = &self.ctx.partitions.history.messages;
437        let start = self.session_history_baseline.min(history.len());
438        history[start..].to_vec()
439    }
440
441    pub fn start(&mut self, task: RuntimeTask) -> LoopAction {
442        self.observations.clear();
443        self.ctx.init_task(task.goal.clone(), task.criteria.clone());
444
445        let user_msg = "Proceed with the task described in [TASK STATE].".to_string();
446
447        // User message goes into history so it appears at the correct chronological
448        // position: [prior turns...] → [current user message] — LLM reads left-to-right
449        // and responds to the last message. working is reserved for runtime signals only.
450        // Estimate tokens (1 token ≈ 4 chars) with a minimum of 1 so the renderer
451        // does not skip this message (it skips zero-token entries).
452        let user_tokens = self.ctx.engine.count(&user_msg).max(1);
453        self.ctx.push_history(Message::user(user_msg), user_tokens);
454        self.phase = LoopPhase::Reason;
455        // Root task (seeded `Ready` in `new()`) becomes `Running`; `emit_call_llm` sets it.
456        self.emit_call_llm()
457    }
458
459    pub fn feed(&mut self, event: LoopEvent) -> LoopAction {
460        self.observations.clear();
461        self.sweep_expired_leases();
462
463        match event {
464            LoopEvent::Start { task } => self.start(task),
465
466            LoopEvent::LLMResponse { message } => {
467                let tokens = self.message_tokens(&message);
468                self.total_tokens += tokens as u64;
469
470                if let Some(reason) = self.pending_termination.take() {
471                    return self.terminate(reason, Some(message));
472                }
473
474                if message.tool_calls.is_empty() {
475                    // When a milestone contract is active and not yet complete,
476                    // request evaluation instead of terminating.
477                    if !self.milestone.is_complete() {
478                        let phase_id = self.milestone.current_phase_id().unwrap_or("").to_string();
479                        let criteria = self.milestone.current_criteria().to_vec();
480                        let (verifier, required_evidence) = self
481                            .milestone
482                            .contract
483                            .as_ref()
484                            .and_then(|c| c.phases.get(self.milestone.current_phase))
485                            .map(|p| (p.verifier.clone(), p.required_evidence.clone()))
486                            .unwrap_or_default();
487                        // `tokens` was already computed for this message above.
488                        self.ctx.push_history(message, tokens);
489                        return LoopAction::EvaluateMilestone {
490                            phase_id,
491                            criteria,
492                            verifier,
493                            required_evidence,
494                        };
495                    }
496                    return self.terminate(TerminationReason::Completed, Some(message));
497                }
498
499                let calls = message.tool_calls.clone();
500                self.ctx.push_history(message, tokens);
501
502                // ━━ 记录活动时间(Layer 3时间衰减使用)
503                if let Some(now_ms) = self.last_now_ms {
504                    self.ctx.record_activity(now_ms);
505                }
506
507                // 2b: record this turn's tool activity into the task-state recency log (meta-tools
508                // filtered inside). The State-turn footer renders it as "just did: …" + a forward
509                // nudge / STOP, so progress is kernel-derived and never depends on the model
510                // remembering to call `update_plan`. Tool *names* live only on the request (results
511                // carry call_id only), so this is the turn to capture them.
512                //
513                // Capture name AND a compact arg digest: the no-progress STOP keys on whether the
514                // SAME call repeats, and a legit loop (same tool, DIFFERENT args — e.g. processing 20
515                // items) is real progress, not a stall. Keying on the name alone false-positives those
516                // loops; including args distinguishes "step(n=1), step(n=2)…" from a true repeat.
517                let action_sigs: Vec<(String, String)> = calls
518                    .iter()
519                    .map(|c| (c.name.to_string(), compact_tool_args(&c.arguments)))
520                    .collect();
521                self.ctx.note_tool_actions(&action_sigs);
522
523                match self.gate_tool_calls(&calls) {
524                    GateToolOutcome::Blocked(action) => return action,
525                    GateToolOutcome::Suspended => return LoopAction::AwaitingResume,
526                    GateToolOutcome::Proceed => {}
527                }
528                self.emit_page_in_requested(&calls);
529                self.phase = LoopPhase::Act {
530                    tool_calls: calls.clone(),
531                };
532                self.set_lifecycle(TaskState::Running, None);
533                LoopAction::ExecuteTools { calls }
534            }
535
536            LoopEvent::ToolResults { mut results } => {
537                if !self.pending_denied_results.is_empty() {
538                    results.append(&mut self.pending_denied_results);
539                }
540                if let Some(reason) = results
541                    .iter()
542                    .find_map(|result| self.rollback_reason_for_tool_result(result))
543                {
544                    let note = Message::user(super::rollback::build_rollback_note(
545                        &reason,
546                        self.ctx.config.verbose_control_notes,
547                    ));
548                    self.rollback(reason);
549                    self.ctx.push_signal(note.content.as_text().unwrap_or_default().to_string());
550                    self.phase = LoopPhase::Reason;
551                    return self.emit_call_llm();
552                }
553                // Non-fatal errors are committed to history so the LLM can
554                // see them and self-correct without losing turn state.
555
556                for r in &results {
557                    self.total_tokens += r.token_count.unwrap_or(0) as u64;
558                    // Preserve Content::Parts (structured / multimodal tool output).
559                    // Parts are serialised to JSON so the text can be restored faithfully.
560                    let raw_output = match &r.output {
561                        Content::Text(s) => s.clone(),
562                        Content::Parts(parts) => serde_json::to_string(parts).unwrap_or_default(),
563                    };
564                    // Layer 1 spool: oversized results keep only a preview in context; the kernel
565                    // emits `LargeResultSpooled` so the SDK persists the full output it still holds.
566                    let (output, spooled) = match crate::mm::plan_spool(
567                        &raw_output,
568                        self.ctx.config.spool_threshold_bytes,
569                        self.ctx.config.spool_preview_bytes,
570                    ) {
571                        Some(decision) => {
572                            self.observations.push(KernelObservation::LargeResultSpooled {
573                                turn: self.turn,
574                                call_id: r.call_id.to_string(),
575                                // ToolResult carries no tool name; the SDK maps call_id -> tool.
576                                tool: String::new(),
577                                original_size: decision.original_size,
578                                preview_size: decision.preview.len() as u32,
579                                spool_ref: None,
580                            });
581                            (decision.preview, true)
582                        }
583                        None => (raw_output, false),
584                    };
585                    let parts = vec![ContentPart::ToolResult {
586                        call_id: r.call_id.clone(),
587                        output,
588                        is_error: r.is_error,
589                    }];
590                    let tool_msg = Message::tool(parts);
591                    // When spooled, `r.token_count` reflects the full output — recount the preview.
592                    let tokens = if spooled {
593                        self.ctx.engine.count_message(&tool_msg)
594                    } else {
595                        r.token_count
596                            .unwrap_or_else(|| self.ctx.engine.count_message(&tool_msg))
597                    };
598                    self.ctx.push_history(tool_msg, tokens);
599                    // Layer 1: a spooled result's handle is marked SpooledOut (its full output now
600                    // lives on disk via the SDK); the SDK maps call_id -> the persisted ref.
601                    if spooled {
602                        self.ctx.mark_spooled(&r.call_id, r.call_id.to_string());
603                    }
604                }
605                self.turn += 1;
606
607                // M1 收口: the pure `schedule()` is now the single budget decision point.
608                // It evaluates the same three axes (turn/token/wall) via `BudgetLedger`, which
609                // delegates to `SchedulerBudget::should_terminate` internally — one source of truth.
610                if let ScheduleDecision::Terminate { reason: term, .. } =
611                    super::tcb::schedule(&self.root_tcb(), self.last_now_ms)
612                {
613                    let budget = match term {
614                        TerminationReason::MaxTurns => "max_turns",
615                        TerminationReason::Timeout => "wall_time",
616                        _ => "token_budget",
617                    };
618                    self.observations.push(KernelObservation::BudgetExceeded {
619                        turn: self.turn,
620                        budget: budget.to_string(),
621                    });
622                    self.pending_termination = Some(term);
623                    self.phase = LoopPhase::Reason;
624                    return self.emit_call_llm();
625                }
626
627                // ━━ Eviction checkpoint (M3): one decision model (`plan_eviction`), one
628                // execution funnel (`execute_eviction_op`). Layer 3 (idle/time-decay) must run
629                // before the rho recommendation is read, since it mutates token usage — so the
630                // plan is built in that interleaved order and the ops are executed in plan order.
631                let idle_decay = self
632                    .last_now_ms
633                    .is_some_and(|now_ms| self.ctx.should_time_decay_compact(now_ms));
634                if idle_decay {
635                    self.execute_eviction_op(&crate::mm::EvictionOp::TimeDecayMicro);
636                }
637
638                // Layer 4 read-time projection: recompute handle residency on the post-time-decay rho.
639                self.ctx.recompute_handle_residency();
640                self.phase = LoopPhase::Delta {
641                    pressure: self.ctx.rho(),
642                };
643
644                // Layers 2/4/5: execute the pressure-driven ops from the plan (skip TimeDecayMicro
645                // if already executed). The plan carries specific ops stamped with real config-derived
646                // params (W1-1 収口 — no magic-number placeholders), not the umbrella `Pressure` wrapper.
647                let (target_tokens, preserve_turns) = self.ctx.plan_compaction_params();
648                let plan =
649                    crate::mm::plan_eviction(self.ctx.should_compress(), idle_decay, target_tokens, preserve_turns);
650                // `idle_decay` ⇒ the plan carries a `TimeDecayMicro` (so the skip-on-already-executed
651                // below is meaningful). The converse does NOT hold: a pressure-driven `MicroCompact`
652                // also emits `TimeDecayMicro` independent of `idle_decay` (W1 unified planner), so we
653                // assert the implication, not equality.
654                debug_assert!(!idle_decay || plan.has_time_decay());
655                for op in &plan.ops {
656                    // Skip TimeDecayMicro if we already executed it (prevents double-execution).
657                    if matches!(op, crate::mm::EvictionOp::TimeDecayMicro) && idle_decay {
658                        continue;
659                    }
660                    self.execute_eviction_op(op);
661                }
662
663                // Renewal: when compression alone cannot recover enough headroom,
664                // start a new sprint — carry forward system + memory + last N history turns.
665                if self.ctx.should_renew() {
666                    self.ctx.renew();
667                    // A new sprint is a session boundary for signal identity: clear the dedup set so
668                    // it cannot grow unbounded across a long run, and so a signal seen in a prior
669                    // sprint may legitimately re-fire in the new one.
670                    if let Some(router) = self.signal_router.as_mut() {
671                        router.clear_dedup();
672                    }
673                    self.observations.push(KernelObservation::Renewed {
674                        sprint: self.ctx.sprint,
675                    });
676                }
677
678                // Turn boundary: drain any kernel-queued signals into context so they
679                // are seen on the next reasoning turn (ready queue → running).
680                self.drain_queued_signals();
681
682                self.phase = LoopPhase::Reason;
683                self.emit_call_llm()
684            }
685
686            LoopEvent::Signal { signal } => {
687                // `feed` always returns an action; non-actionable dispositions
688                // (queue/observe/ignore) fall back to a plain provider call here.
689                // The kernel-routed path (`dispatch_signal`) is driven via the ABI.
690                self.dispatch_signal(signal)
691                    .unwrap_or_else(|| self.emit_call_llm())
692            }
693
694            LoopEvent::MilestoneResult { result } => self.handle_milestone_result(result),
695
696            LoopEvent::SubAgentCompleted { result } => self.handle_sub_agent_completed(result),
697
698            LoopEvent::Timeout => {
699                let reason = RollbackReason::Timeout;
700                let note = Message::user(super::rollback::build_rollback_note(
701                    &reason,
702                    self.ctx.config.verbose_control_notes,
703                ));
704                self.rollback(reason);
705                self.ctx.push_signal(note.content.as_text().unwrap_or_default().to_string());
706                self.phase = LoopPhase::Reason;
707                self.emit_call_llm()
708            }
709        }
710    }
711
712
713    /// Drain observations emitted during the last `start`/`feed` call.
714    pub fn take_observations(&mut self) -> Vec<KernelObservation> {
715        std::mem::take(&mut self.observations)
716    }
717
718    /// W2-2: Create a snapshot of the current kernel state for crash recovery or migration.
719    pub fn snapshot(&self) -> crate::runtime::snapshot::KernelSnapshot {
720        use crate::runtime::snapshot::{ContextSnapshot, KernelSnapshot};
721        let context = ContextSnapshot::from_context(&self.ctx);
722        KernelSnapshot::from_state(
723            self.turn,
724            self.total_tokens,
725            &self.tasks,
726            &context,
727            self.run_spec.as_ref(),
728        )
729    }
730
731    /// W2-2: Restore kernel state from a snapshot. Returns a new LoopStateMachine rebuilt from the snapshot.
732    /// Note: This is a foundational restore - some state (governance, milestone, signal router dedup) is
733    /// recreated from policy/config rather than serialized, following the principle that strategy is data.
734    pub fn restore(snap: &crate::runtime::snapshot::KernelSnapshot) -> Self {
735        use crate::signals::router::SignalRouter;
736
737        // Reconstruct policy from the max_tokens in snapshot
738        let policy = crate::scheduler::policy::LoopPolicy {
739            max_tokens: snap.context.max_tokens,
740            ..Default::default()
741        };
742
743        // Rebuild TaskTable from snapshot TCBs
744        let mut tasks = TaskTable::new();
745        for tcb_snap in &snap.tasks {
746            if let Some(tcb) = snap.restore_tcb(tcb_snap) {
747                tasks.insert(tcb);
748            }
749        }
750
751        // Rebuild context partitions from snapshot
752        let mut ctx = ContextManager::new(snap.context.max_tokens);
753        ctx.sprint = snap.context.sprint;
754
755        // Restore messages
756        for msg in &snap.context.system_messages {
757            let tokens = ctx.engine.count_message(msg);
758            ctx.partitions.system.push(msg.clone(), tokens);
759        }
760        for msg in &snap.context.knowledge_messages {
761            let tokens = ctx.engine.count_message(msg);
762            ctx.partitions.knowledge.push(msg.clone(), tokens);
763        }
764        for msg in &snap.context.history_messages {
765            let tokens = ctx.engine.count_message(msg);
766            ctx.partitions.history.push(msg.clone(), tokens);
767        }
768
769        // Restore task state
770        if let Some(goal) = &snap.context.task_goal {
771            ctx.partitions.task_state.goal = goal.clone();
772        }
773        if let Some(plan_json) = &snap.context.task_plan {
774            if let Ok(plan_steps) = serde_json::from_str::<Vec<crate::context::task_state::PlanStep>>(plan_json) {
775                ctx.partitions.task_state.plan = plan_steps;
776            }
777        }
778        if let Some(progress) = &snap.context.task_progress {
779            ctx.partitions.task_state.progress = progress.clone();
780        }
781        ctx.partitions.task_state.directives = snap.context.task_directives.clone();
782
783        // Restore signals
784        ctx.partitions.signals = snap.context.signals.clone();
785
786        Self {
787            phase: LoopPhase::Reason,
788            turn: snap.turn,
789            ctx,
790            tools: Vec::new(),  // Tools are rebuilt from capabilities on next LLM call
791            observations: Vec::new(),
792            policy,
793            total_tokens: snap.total_tokens,
794            // Re-seeded from the replayed `ConfigureRun` (strategy is data, not serialized state).
795            group_tokens_base: 0,
796            group_spawns_base: 0,
797            pending_termination: None,
798            session_history_baseline: 0,
799            checkpoint: TurnCheckpoint::default(),
800            milestone: crate::scheduler::milestone::MilestoneTracker::new(),
801            run_spec: snap.run_spec(),
802            tasks,
803            governance: None,  // Governance is policy data, recreated from config
804            resource_quota: None,
805            memory_write_times: Vec::new(),
806            memory_policy: None,
807            signal_router: Some(SignalRouter::new(64)),  // Dedup cleared on restore
808            started_at_ms: None,
809            last_now_ms: None,
810            suspend_state: None,
811            pending_denied_results: Vec::new(),
812            workflow: None,
813        }
814    }
815
816    fn terminate(
817        &mut self,
818        termination: TerminationReason,
819        final_message: Option<Message>,
820    ) -> LoopAction {
821        // Commit the final response into history so subsequent session restores
822        // include the complete transcript: user → [tool turns] → final assistant.
823        if let Some(ref msg) = final_message {
824            let tokens = self.message_tokens(msg);
825            self.ctx.push_history(msg.clone(), tokens);
826        }
827        let result = LoopResult {
828            termination,
829            final_message,
830            turns_used: self.turn,
831            total_tokens_used: self.total_tokens,
832            loop_continue: None,
833            classify_branch: None,
834            tournament_winner: None,
835        };
836        self.set_lifecycle(TaskState::Done(termination), None);
837        LoopAction::Done { result }
838    }
839
840    /// Build the `CallLLM` action with a structured `RenderedContext`.
841    /// Meta-tools (skill / memory / knowledge) are appended to the tool list
842    /// when configured. When `pending_termination` is set, tools are stripped
843    /// to force a plain-text response before the loop terminates.
844    fn emit_call_llm(&mut self) -> LoopAction {
845        // Calling the provider is definitionally "running" — the single funnel for entering the
846        // Running lifecycle (covers start, resume, signal-driven turns, budget final-call).
847        self.set_lifecycle(TaskState::Running, None);
848        self.checkpoint.history_len = self.ctx.partitions.history.messages.len();
849        self.checkpoint.signals_len = self.ctx.partitions.signals.len();
850        self.checkpoint.task_state = Some(self.ctx.partitions.task_state.clone());
851        self.observations.push(KernelObservation::CheckpointTaken {
852            turn: self.turn,
853            history_len: self.checkpoint.history_len as u32,
854        });
855
856        let context = self.ctx.render();
857        if self.pending_termination.is_some() {
858            return LoopAction::CallLLM {
859                context,
860                tools: Vec::new(),
861            };
862        }
863        let mut tools = self.tools.clone();
864        tools.extend(self.ctx.meta_tool_schemas());
865
866        if let Some(ref spec) = self.run_spec {
867            use crate::types::capability::CapabilityKind;
868            tools.retain(|tool| {
869                let kind = match tool.name.as_str() {
870                    "skill" => CapabilityKind::Skill,
871                    "memory" => CapabilityKind::Memory,
872                    "knowledge" => CapabilityKind::Knowledge,
873                    _ => CapabilityKind::Tool,
874                };
875                let desc = crate::types::capability::CapabilityDescriptor::marker(
876                    kind,
877                    tool.name.clone(),
878                    &tool.description,
879                );
880                spec.capability_filter.allows(&desc)
881            });
882        }
883
884        // P1-B epoch skill gating (applied *after* the run-level filter ③, so A is the outer bound
885        // and B narrows within it — D6). When skills are active and declare tools, expose only
886        // `meta-tools ∪ stable-core ∪ ⋃(active skills' allowed_tools)`. `None` ⇒ no active/declared
887        // skill ⇒ no narrowing (D3, errs-open). Meta-tools are always exempt (D5) so the model can
888        // still load more skills. Byte-stable within an epoch: the set only changes on activation.
889        if let Some(allowed) = self.ctx.active_skill_tool_filter() {
890            let stable = &self.ctx.stable_core_tools;
891            tools.retain(|tool| {
892                matches!(tool.name.as_str(), "skill" | "memory" | "knowledge" | "update_plan")
893                    || stable.contains(&tool.name)
894                    || allowed.contains(&tool.name)
895            });
896        }
897
898        LoopAction::CallLLM { context, tools }
899    }
900
901    pub fn rollback(&mut self, reason: RollbackReason) {
902        self.ctx.partitions.history.messages.truncate(self.checkpoint.history_len);
903        self.ctx.partitions.signals.truncate(self.checkpoint.signals_len);
904        if let Some(ref state) = self.checkpoint.task_state {
905            self.ctx.partitions.task_state = state.clone();
906        }
907        self.observations.push(KernelObservation::Rollbacked {
908            turn: self.turn,
909            checkpoint_history_len: self.checkpoint.history_len as u32,
910            reason: Some(reason),
911        });
912    }
913
914    fn rollback_reason_for_tool_result(&self, result: &ToolResult) -> Option<RollbackReason> {
915        let tool_name = self.tool_name_for_call(&result.call_id);
916        let output = super::rollback::tool_result_output_text(result);
917
918        if result.is_fatal {
919            return Some(RollbackReason::FatalToolError {
920                tool_name,
921                error: output,
922            });
923        }
924
925        match result.error_kind {
926            Some(ToolErrorKind::Fatal) => Some(RollbackReason::FatalToolError {
927                tool_name,
928                error: output,
929            }),
930            Some(ToolErrorKind::GovernanceDenied) => Some(RollbackReason::GovernanceDenied {
931                tool_name,
932                reason: output,
933            }),
934            Some(ToolErrorKind::ProviderFailure) => {
935                Some(RollbackReason::ProviderFailure { error: output })
936            }
937            Some(ToolErrorKind::Timeout) => Some(RollbackReason::Timeout),
938            Some(ToolErrorKind::UserInterrupt) => Some(RollbackReason::UserInterrupt),
939            Some(ToolErrorKind::Recoverable) | None => None,
940        }
941    }
942
943    fn tool_name_for_call(&self, call_id: &compact_str::CompactString) -> String {
944        match &self.phase {
945            LoopPhase::Act { tool_calls } => tool_calls
946                .iter()
947                .find(|call| call.id == *call_id)
948                .map(|call| call.name.to_string())
949                .unwrap_or_else(|| call_id.to_string()),
950            _ => call_id.to_string(),
951        }
952    }
953}
954
955#[cfg(test)]
956#[path = "tests.rs"]
957mod tests;