Skip to main content

deepstrike_core/scheduler/state_machine/
mod.rs

1use std::collections::HashMap;
2
3use super::milestone::MilestoneTracker;
4use super::policy::LoopPolicy;
5use super::tcb::{ScheduleDecision, TaskState, TaskTable, Tcb, WaitReason};
6use crate::AgentRunSpec;
7use crate::context::manager::ContextManager;
8use crate::governance::pipeline::GovernancePipeline;
9use crate::signals::router::SignalRouter;
10use crate::types::result::SubAgentResult;
11use crate::context::renderer::RenderedContext;
12// `pub use` so external integration tests that glob `state_machine::*` resolve the observation
13// type here — exactly as they did for the former `pub enum LoopObservation` this replaced.
14pub use crate::runtime::kernel::KernelObservation;
15use crate::runtime::session::RollbackReason;
16use crate::types::message::{
17    Content, ContentPart, Message, ToolCall, ToolErrorKind, ToolResult, ToolSchema,
18};
19use crate::types::milestone::MilestoneCheckResult;
20use crate::types::result::{LoopResult, TerminationReason};
21use crate::types::signal::RuntimeSignal;
22use crate::types::task::RuntimeTask;
23
24/// The *turn step* of the L* execution loop (M1d).
25///
26/// Schedulability (`Ready/Running/Blocked/Suspended/Done`) is no longer carried here — it lives
27/// on the root task's [`TaskState`] in the kernel's `TaskTable`, queried via
28/// [`LoopStateMachine::lifecycle`]. `LoopPhase` is now orthogonal: it only records *which step of a
29/// running turn* the loop is in. When the task is `Ready/Suspended/Done`, the phase value is
30/// inert (left at its last step) and ignored.
31#[derive(Debug, Clone)]
32pub enum LoopPhase {
33    Reason,
34    Act { tool_calls: Vec<ToolCall> },
35    Observe { results: Vec<ToolResult> },
36    Delta { pressure: f64 },
37}
38
39/// Why the loop entered `Suspended` state.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum SuspendReason {
42    /// Governance `AskUser` — waiting for SDK to resolve human approval.
43    AskUser,
44    /// Sub-agent spawned — waiting for sub-agent to complete.
45    SubAgentAwait,
46    /// Externally requested suspension.
47    External,
48}
49
50/// What the loop is blocked waiting for.
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum BlockReason {
53    /// Awaiting a tool's continuation (tool suspend pattern).
54    ToolSuspend,
55    /// Awaiting milestone evaluation result.
56    MilestoneAwait,
57}
58
59/// Events fed into the state machine from the SDK layer.
60#[derive(Debug)]
61pub enum LoopEvent {
62    Start {
63        task: RuntimeTask,
64    },
65    LLMResponse {
66        message: Message,
67    },
68    ToolResults {
69        results: Vec<ToolResult>,
70    },
71    /// Inbound signal from SignalRouter — Critical/High urgency may interrupt.
72    Signal {
73        signal: RuntimeSignal,
74    },
75    /// Result of evaluating the current milestone phase's criteria.
76    /// Feed this back after handling `LoopAction::EvaluateMilestone`.
77    MilestoneResult {
78        result: MilestoneCheckResult,
79    },
80    /// Sub-agent run completed — result is injected into the loop as context.
81    SubAgentCompleted {
82        result: SubAgentResult,
83    },
84    Timeout,
85}
86
87/// Actions the state machine outputs — SDK layer executes the I/O.
88#[derive(Debug)]
89pub enum LoopAction {
90    /// Structured context ready for a provider call.
91    /// `context.system_text` → provider system param.
92    /// `context.turns`       → provider messages array (strictly alternating).
93    /// `tools`               → tool schemas (skill / memory / knowledge / user tools).
94    CallLLM {
95        context: RenderedContext,
96        tools: Vec<ToolSchema>,
97    },
98    ExecuteTools {
99        calls: Vec<ToolCall>,
100    },
101    Done {
102        result: LoopResult,
103    },
104    /// Kernel requests the SDK to evaluate the current milestone phase.
105    ///
106    /// The SDK should assess `criteria` against the agent's output using the
107    /// specified `verifier`, then feed back `LoopEvent::MilestoneResult { result }`.
108    EvaluateMilestone {
109        phase_id: String,
110        criteria: Vec<String>,
111        verifier: Option<crate::types::milestone::MilestoneVerifier>,
112        required_evidence: Vec<String>,
113    },
114    /// Kernel is suspended — SDK must resolve (e.g. human approval) and feed `Resume`.
115    AwaitingResume,
116}
117
118/// Payload held while the loop is in `Suspended`.
119#[derive(Debug, Clone)]
120pub(super) enum SuspendState {
121    /// Governance AskUser — awaiting `Resume { approved_calls, denied_calls }`.
122    AskUser {
123        calls: Vec<ToolCall>,
124        gated_reasons: HashMap<String, String>,
125    },
126    /// Sub-agent spawn — awaiting `SubAgentCompleted` for each listed agent id.
127    SubAgentAwait {
128        agent_ids: Vec<String>,
129    },
130}
131
132pub(super) enum GateToolOutcome {
133    Proceed,
134    Blocked(LoopAction),
135    Suspended,
136}
137
138/// Snapshot of context lengths captured just before each LLM call.
139/// Used internally to restore state on rollback.
140#[derive(Debug, Clone, Default)]
141pub struct TurnCheckpoint {
142    pub history_len: usize,
143    pub signals_len: usize,
144    pub task_state: Option<crate::context::task_state::TaskState>,
145}
146
147/// Pure state machine for the L* execution loop. No I/O — only state transitions.
148///
149/// Internal engine backing [`crate::runtime::KernelRuntime`]. Exposed for in-crate
150/// use and tests; external callers should drive the kernel through `KernelRuntime`.
151#[doc(hidden)]
152pub struct LoopStateMachine {
153    pub phase: LoopPhase,
154    pub turn: u32,
155    pub ctx: ContextManager,
156    pub tools: Vec<ToolSchema>,
157    pub observations: Vec<KernelObservation>,
158    pub(super) policy: LoopPolicy,
159    pub(super) total_tokens: u64,
160    /// When set, the next LLM call strips tools to force a text response,
161    /// then terminates with this reason once the response arrives.
162    pub(super) pending_termination: Option<TerminationReason>,
163    /// Number of history messages present at session start (after preload_history).
164    /// drain_new_messages() returns the slice from this offset onward.
165    pub(super) session_history_baseline: usize,
166    pub(super) checkpoint: TurnCheckpoint,
167    /// Milestone contract tracker (extracted to reduce state machine bloat).
168    pub(super) milestone: MilestoneTracker,
169    pub run_spec: Option<AgentRunSpec>,
170    /// M1 収口: the single source of truth for schedulability *and* sub-agent lineage. Root is
171    /// task `"root"`; each sub-agent is a child task carrying its `ProcInfo`. The former
172    /// `ProcessTable` is now a derived view over this (`agent_process(es)` rebuild `AgentProcess`
173    /// rows on demand via `AgentProcess::from_tcb`).
174    pub(super) tasks: TaskTable,
175    /// Optional governance pipeline. When set, every tool call proposed by the
176    /// model is evaluated before `ExecuteTools` is emitted. `None` (default)
177    /// skips the gate entirely, preserving the pre-governance behavior.
178    pub(super) governance: Option<GovernancePipeline>,
179    /// Optional resource quota evaluated at the syscall trap (M2). `None` (default) leaves spawn /
180    /// memory syscalls unconditionally allowed, preserving pre-M2 behavior.
181    pub(super) resource_quota: Option<crate::governance::quota::ResourceQuota>,
182    /// Timestamps of recent allowed `WriteMemory` syscalls, for the rolling-window rate limit.
183    /// Only populated when `resource_quota.memory_writes_per_window` is set.
184    pub(super) memory_write_times: Vec<u64>,
185    /// Optional long-term memory policy (`set_memory_policy`). `None` (default) preserves
186    /// pre-policy behavior: default-rule validation + verbatim retrieval `top_k`.
187    pub(super) memory_policy: Option<crate::mm::memory::MemoryPolicy>,
188    /// Optional in-kernel signal router. When set, inbound signals are routed
189    /// through dedup + attention policy + queue here (the kernel owns disposition).
190    /// `None` (default) keeps the legacy hardcoded urgency handling in `feed`.
191    pub(super) signal_router: Option<SignalRouter>,
192    /// Wall-clock timestamp of the first `ProviderResult.now_ms` received.
193    /// Used by the wall-time budget axis in `SchedulerBudget::should_terminate`.
194    pub(super) started_at_ms: Option<u64>,
195    /// Most-recent `now_ms` value from `ProviderResult`, forwarded to the budget check.
196    pub(super) last_now_ms: Option<u64>,
197    /// Tool batch awaiting `Resume` after an AskUser suspend.
198    pub(super) suspend_state: Option<SuspendState>,
199    /// Denied tool results to merge into the next `ToolResults` feed after resume.
200    pub(super) pending_denied_results: Vec<ToolResult>,
201    /// W0: an in-flight workflow DAG, when one is loaded. The kernel spawns its ready nodes as
202    /// gated batches (each through `evaluate_syscall(Syscall::Spawn)`) and advances on
203    /// completions. `None` (default) preserves the single-spawn `spawn_sub_agent` behavior.
204    pub(super) workflow: Option<crate::orchestration::workflow::WorkflowRun>,
205}
206
207mod signal;
208mod capability;
209mod gate;
210mod eviction;
211mod process;
212mod workflow;
213mod milestone_exec;
214
215impl LoopStateMachine {
216    fn message_tokens(&self, message: &Message) -> u32 {
217        message
218            .token_count
219            .unwrap_or_else(|| self.ctx.engine.count_message(message))
220    }
221
222    pub fn new(policy: LoopPolicy) -> Self {
223        let mut tasks = TaskTable::new();
224        // M1d: the root task carries the authoritative schedulability lifecycle. It starts
225        // `Ready`; `start()`/`resume_*` flip it to `Running`, suspends set `Suspended`, and
226        // `terminate()` sets `Done`. `phase` is now only the intra-turn step.
227        tasks.insert(Tcb::root("root", policy.clone()));
228        Self {
229            // Inert placeholder step; meaningful only while the root task is `Running`.
230            phase: LoopPhase::Reason,
231            turn: 0,
232            ctx: ContextManager::new(policy.max_tokens),
233            tools: Vec::new(),
234            observations: Vec::new(),
235            policy,
236            total_tokens: 0,
237            pending_termination: None,
238            session_history_baseline: 0,
239            checkpoint: TurnCheckpoint::default(),
240            milestone: MilestoneTracker::new(),
241            run_spec: None,
242            tasks,
243            governance: None,
244            resource_quota: None,
245            memory_write_times: Vec::new(),
246            memory_policy: None,
247            signal_router: Some(SignalRouter::new(64)),
248            started_at_ms: None,
249            last_now_ms: None,
250            suspend_state: None,
251            pending_denied_results: Vec::new(),
252            workflow: None,
253        }
254    }
255
256    /// The authoritative schedulability lifecycle of the loop (root task state). Replaces the
257    /// removed `LoopPhase::{Idle,Suspended,Blocked,Terminal}` reads.
258    pub fn lifecycle(&self) -> TaskState {
259        self.tasks.get("root").map(|t| t.state).unwrap_or(TaskState::Ready)
260    }
261
262    /// The wait reason while suspended/blocked, if any.
263    pub fn wait_reason(&self) -> Option<WaitReason> {
264        self.tasks.get("root").and_then(|t| t.wait.clone())
265    }
266
267    /// Whether the loop has terminated.
268    pub fn is_terminal(&self) -> bool {
269        matches!(self.lifecycle(), TaskState::Done(_))
270    }
271
272    /// Whether the loop is suspended awaiting external resolution.
273    pub fn is_suspended(&self) -> bool {
274        matches!(self.lifecycle(), TaskState::Suspended)
275    }
276
277    /// Set the root task's lifecycle (and wait reason). Single mutation point for schedulability.
278    fn set_lifecycle(&mut self, state: TaskState, wait: Option<WaitReason>) {
279        if let Some(root) = self.tasks.get_mut("root") {
280            root.state = state;
281            root.wait = wait;
282        } else {
283            let mut root = Tcb::root("root", self.policy.clone());
284            root.state = state;
285            root.wait = wait;
286            self.tasks.insert(root);
287        }
288    }
289
290    /// Build a transient root [`Tcb`] mirroring the current scheduling facts (budget counters,
291    /// wall-clock anchors, lifecycle). M1b uses this to run the pure `schedule()` spine in
292    /// parallel with the legacy budget path; later milestones promote it to the live task row.
293    fn root_tcb(&self) -> Tcb {
294        let mut tcb = Tcb::root("root", self.policy.clone());
295        tcb.budget.turns = self.turn;
296        tcb.budget.total_tokens = self.total_tokens;
297        tcb.budget.started_at_ms = self.started_at_ms;
298        tcb.state = self.lifecycle();
299        tcb
300    }
301
302    /// Adjust the wall-clock budget axis at runtime.
303    pub fn set_wall_budget(&mut self, max_wall_ms: Option<u64>) {
304        self.policy.max_wall_ms = max_wall_ms;
305    }
306
307    /// Install a governance pipeline. Once set, all model-proposed tool calls
308    /// are evaluated before execution. Denied/rate-limited calls roll the turn
309    /// back (reusing the `GovernanceDenied` path); `AskUser` calls surface a
310    /// `ToolGated` observation for the SDK to enforce.
311    pub fn set_governance(&mut self, pipeline: GovernancePipeline) {
312        self.governance = Some(pipeline);
313    }
314
315    /// Install resource quotas (M2). Once set, `Spawn` and `WriteMemory` syscalls are bounded by
316    /// the quota at the trap. Not setting it (the default) leaves them unconditionally allowed.
317    pub fn set_resource_quota(&mut self, quota: crate::governance::quota::ResourceQuota) {
318        self.resource_quota = Some(quota);
319    }
320
321    /// Install the long-term memory policy (`set_memory_policy`). Once set it gates `write_memory`
322    /// validation and bounds `query_memory` retrieval breadth. Not setting it (the default)
323    /// preserves pre-policy behavior.
324    pub fn set_memory_policy(&mut self, policy: crate::mm::memory::MemoryPolicy) {
325        self.memory_policy = Some(policy);
326    }
327
328    /// The installed memory policy, if any. `None` means default-rule validation + verbatim top_k.
329    pub fn memory_policy(&self) -> Option<&crate::mm::memory::MemoryPolicy> {
330        self.memory_policy.as_ref()
331    }
332
333    /// Feed the current wall-clock time (ms) to scheduler/governance budget axes.
334    pub fn set_observed_time(&mut self, now_ms: u64) {
335        if self.started_at_ms.is_none() {
336            self.started_at_ms = Some(now_ms);
337        }
338        self.last_now_ms = Some(now_ms);
339        if let Some(pipeline) = self.governance.as_mut() {
340            pipeline.set_time(now_ms);
341        }
342    }
343
344    /// Pre-populate the history partition with messages from a prior session.
345    ///
346    /// Call **before** `start()` when resuming a conversation. Sets the baseline
347    /// so `drain_new_messages()` returns only the messages from the current run.
348    pub fn preload_history(&mut self, messages: Vec<Message>) {
349        for msg in messages {
350            let tokens = self.message_tokens(&msg);
351            self.ctx.push_history(msg, tokens);
352        }
353        self.session_history_baseline = self.ctx.partitions.history.messages.len();
354    }
355
356    /// Continue from preloaded history without appending a new user turn.
357    /// Use after `preload_history` when recovering a session that ended mid-run.
358    ///
359    /// If the last assistant turn has tool calls without matching tool results,
360    /// resumes with `ExecuteTools` instead of calling the LLM again.
361    pub fn resume_after_preload(&mut self) -> LoopAction {
362        self.observations.clear();
363        let calls = crate::runtime::repair::pending_tool_calls_from_messages(
364            &self.ctx.partitions.history.messages,
365        );
366        if !calls.is_empty() {
367            self.emit_page_in_requested(&calls);
368            self.phase = LoopPhase::Act {
369                tool_calls: calls.clone(),
370            };
371            self.set_lifecycle(TaskState::Running, None);
372            return LoopAction::ExecuteTools { calls };
373        }
374        self.phase = LoopPhase::Reason;
375        self.emit_call_llm()
376    }
377
378    /// Return all messages added to history during the current run
379    /// (since the last `preload_history` call or since construction).
380    ///
381    /// Call after `LoopAction::Done` to get the complete turn transcript
382    /// for persistence to a SessionStore.
383    pub fn drain_new_messages(&self) -> Vec<Message> {
384        let history = &self.ctx.partitions.history.messages;
385        let start = self.session_history_baseline.min(history.len());
386        history[start..].to_vec()
387    }
388
389    pub fn start(&mut self, task: RuntimeTask) -> LoopAction {
390        self.observations.clear();
391        self.ctx.init_task(task.goal.clone(), task.criteria.clone());
392
393        let user_msg = "Proceed with the task described in [TASK STATE].".to_string();
394
395        // User message goes into history so it appears at the correct chronological
396        // position: [prior turns...] → [current user message] — LLM reads left-to-right
397        // and responds to the last message. working is reserved for runtime signals only.
398        // Estimate tokens (1 token ≈ 4 chars) with a minimum of 1 so the renderer
399        // does not skip this message (it skips zero-token entries).
400        let user_tokens = self.ctx.engine.count(&user_msg).max(1);
401        self.ctx.push_history(Message::user(user_msg), user_tokens);
402        self.phase = LoopPhase::Reason;
403        // Root task (seeded `Ready` in `new()`) becomes `Running`; `emit_call_llm` sets it.
404        self.emit_call_llm()
405    }
406
407    pub fn feed(&mut self, event: LoopEvent) -> LoopAction {
408        self.observations.clear();
409        self.sweep_expired_leases();
410
411        match event {
412            LoopEvent::Start { task } => self.start(task),
413
414            LoopEvent::LLMResponse { message } => {
415                let tokens = self.message_tokens(&message);
416                self.total_tokens += tokens as u64;
417
418                if let Some(reason) = self.pending_termination.take() {
419                    return self.terminate(reason, Some(message));
420                }
421
422                if message.tool_calls.is_empty() {
423                    // When a milestone contract is active and not yet complete,
424                    // request evaluation instead of terminating.
425                    if !self.milestone.is_complete() {
426                        let phase_id = self.milestone.current_phase_id().unwrap_or("").to_string();
427                        let criteria = self.milestone.current_criteria().to_vec();
428                        let (verifier, required_evidence) = self
429                            .milestone
430                            .contract
431                            .as_ref()
432                            .and_then(|c| c.phases.get(self.milestone.current_phase))
433                            .map(|p| (p.verifier.clone(), p.required_evidence.clone()))
434                            .unwrap_or_default();
435                        // `tokens` was already computed for this message above.
436                        self.ctx.push_history(message, tokens);
437                        return LoopAction::EvaluateMilestone {
438                            phase_id,
439                            criteria,
440                            verifier,
441                            required_evidence,
442                        };
443                    }
444                    return self.terminate(TerminationReason::Completed, Some(message));
445                }
446
447                let calls = message.tool_calls.clone();
448                self.ctx.push_history(message, tokens);
449
450                // ━━ 记录活动时间(Layer 3时间衰减使用)
451                if let Some(now_ms) = self.last_now_ms {
452                    self.ctx.record_activity(now_ms);
453                }
454
455                match self.gate_tool_calls(&calls) {
456                    GateToolOutcome::Blocked(action) => return action,
457                    GateToolOutcome::Suspended => return LoopAction::AwaitingResume,
458                    GateToolOutcome::Proceed => {}
459                }
460                self.emit_page_in_requested(&calls);
461                self.phase = LoopPhase::Act {
462                    tool_calls: calls.clone(),
463                };
464                self.set_lifecycle(TaskState::Running, None);
465                LoopAction::ExecuteTools { calls }
466            }
467
468            LoopEvent::ToolResults { mut results } => {
469                if !self.pending_denied_results.is_empty() {
470                    results.append(&mut self.pending_denied_results);
471                }
472                if let Some(reason) = results
473                    .iter()
474                    .find_map(|result| self.rollback_reason_for_tool_result(result))
475                {
476                    let note = Message::user(super::rollback::build_rollback_note(
477                        &reason,
478                        self.ctx.config.verbose_control_notes,
479                    ));
480                    self.rollback(reason);
481                    self.ctx.push_signal(note.content.as_text().unwrap_or_default().to_string());
482                    self.phase = LoopPhase::Reason;
483                    return self.emit_call_llm();
484                }
485                // Non-fatal errors are committed to history so the LLM can
486                // see them and self-correct without losing turn state.
487
488                for r in &results {
489                    self.total_tokens += r.token_count.unwrap_or(0) as u64;
490                    // Preserve Content::Parts (structured / multimodal tool output).
491                    // Parts are serialised to JSON so the text can be restored faithfully.
492                    let raw_output = match &r.output {
493                        Content::Text(s) => s.clone(),
494                        Content::Parts(parts) => serde_json::to_string(parts).unwrap_or_default(),
495                    };
496                    // Layer 1 spool: oversized results keep only a preview in context; the kernel
497                    // emits `LargeResultSpooled` so the SDK persists the full output it still holds.
498                    let (output, spooled) = match crate::mm::plan_spool(
499                        &raw_output,
500                        self.ctx.config.spool_threshold_bytes,
501                        self.ctx.config.spool_preview_bytes,
502                    ) {
503                        Some(decision) => {
504                            self.observations.push(KernelObservation::LargeResultSpooled {
505                                turn: self.turn,
506                                call_id: r.call_id.to_string(),
507                                // ToolResult carries no tool name; the SDK maps call_id -> tool.
508                                tool: String::new(),
509                                original_size: decision.original_size,
510                                preview_size: decision.preview.len() as u32,
511                                spool_ref: None,
512                            });
513                            (decision.preview, true)
514                        }
515                        None => (raw_output, false),
516                    };
517                    let parts = vec![ContentPart::ToolResult {
518                        call_id: r.call_id.clone(),
519                        output,
520                        is_error: r.is_error,
521                    }];
522                    let tool_msg = Message::tool(parts);
523                    // When spooled, `r.token_count` reflects the full output — recount the preview.
524                    let tokens = if spooled {
525                        self.ctx.engine.count_message(&tool_msg)
526                    } else {
527                        r.token_count
528                            .unwrap_or_else(|| self.ctx.engine.count_message(&tool_msg))
529                    };
530                    self.ctx.push_history(tool_msg, tokens);
531                    // Layer 1: a spooled result's handle is marked SpooledOut (its full output now
532                    // lives on disk via the SDK); the SDK maps call_id -> the persisted ref.
533                    if spooled {
534                        self.ctx.mark_spooled(&r.call_id, r.call_id.to_string());
535                    }
536                }
537                self.turn += 1;
538
539                // M1 收口: the pure `schedule()` is now the single budget decision point.
540                // It evaluates the same three axes (turn/token/wall) via `BudgetLedger`, which
541                // delegates to `SchedulerBudget::should_terminate` internally — one source of truth.
542                if let ScheduleDecision::Terminate { reason: term, .. } =
543                    super::tcb::schedule(&self.root_tcb(), self.last_now_ms)
544                {
545                    let budget = match term {
546                        TerminationReason::MaxTurns => "max_turns",
547                        TerminationReason::Timeout => "wall_time",
548                        _ => "token_budget",
549                    };
550                    self.observations.push(KernelObservation::BudgetExceeded {
551                        turn: self.turn,
552                        budget: budget.to_string(),
553                    });
554                    self.pending_termination = Some(term);
555                    self.phase = LoopPhase::Reason;
556                    return self.emit_call_llm();
557                }
558
559                // ━━ Eviction checkpoint (M3): one decision model (`plan_eviction`), one
560                // execution funnel (`execute_eviction_op`). Layer 3 (idle/time-decay) must run
561                // before the rho recommendation is read, since it mutates token usage — so the
562                // plan is built in that interleaved order and the ops are executed in plan order.
563                let idle_decay = self
564                    .last_now_ms
565                    .is_some_and(|now_ms| self.ctx.should_time_decay_compact(now_ms));
566                if idle_decay {
567                    self.execute_eviction_op(&crate::mm::EvictionOp::TimeDecayMicro);
568                }
569
570                // Layer 4 read-time projection: recompute handle residency on the post-time-decay rho.
571                self.ctx.recompute_handle_residency();
572                self.phase = LoopPhase::Delta {
573                    pressure: self.ctx.rho(),
574                };
575
576                // Layers 2/4/5: execute the pressure-driven ops from the plan (skip TimeDecayMicro
577                // if already executed). The plan carries specific ops stamped with real config-derived
578                // params (W1-1 収口 — no magic-number placeholders), not the umbrella `Pressure` wrapper.
579                let (target_tokens, preserve_turns) = self.ctx.plan_compaction_params();
580                let plan =
581                    crate::mm::plan_eviction(self.ctx.should_compress(), idle_decay, target_tokens, preserve_turns);
582                // `idle_decay` ⇒ the plan carries a `TimeDecayMicro` (so the skip-on-already-executed
583                // below is meaningful). The converse does NOT hold: a pressure-driven `MicroCompact`
584                // also emits `TimeDecayMicro` independent of `idle_decay` (W1 unified planner), so we
585                // assert the implication, not equality.
586                debug_assert!(!idle_decay || plan.has_time_decay());
587                for op in &plan.ops {
588                    // Skip TimeDecayMicro if we already executed it (prevents double-execution).
589                    if matches!(op, crate::mm::EvictionOp::TimeDecayMicro) && idle_decay {
590                        continue;
591                    }
592                    self.execute_eviction_op(op);
593                }
594
595                // Renewal: when compression alone cannot recover enough headroom,
596                // start a new sprint — carry forward system + memory + last N history turns.
597                if self.ctx.should_renew() {
598                    self.ctx.renew();
599                    // A new sprint is a session boundary for signal identity: clear the dedup set so
600                    // it cannot grow unbounded across a long run, and so a signal seen in a prior
601                    // sprint may legitimately re-fire in the new one.
602                    if let Some(router) = self.signal_router.as_mut() {
603                        router.clear_dedup();
604                    }
605                    self.observations.push(KernelObservation::Renewed {
606                        sprint: self.ctx.sprint,
607                    });
608                }
609
610                // Turn boundary: drain any kernel-queued signals into context so they
611                // are seen on the next reasoning turn (ready queue → running).
612                self.drain_queued_signals();
613
614                self.phase = LoopPhase::Reason;
615                self.emit_call_llm()
616            }
617
618            LoopEvent::Signal { signal } => {
619                // `feed` always returns an action; non-actionable dispositions
620                // (queue/observe/ignore) fall back to a plain provider call here.
621                // The kernel-routed path (`dispatch_signal`) is driven via the ABI.
622                self.dispatch_signal(signal)
623                    .unwrap_or_else(|| self.emit_call_llm())
624            }
625
626            LoopEvent::MilestoneResult { result } => self.handle_milestone_result(result),
627
628            LoopEvent::SubAgentCompleted { result } => self.handle_sub_agent_completed(result),
629
630            LoopEvent::Timeout => {
631                let reason = RollbackReason::Timeout;
632                let note = Message::user(super::rollback::build_rollback_note(
633                    &reason,
634                    self.ctx.config.verbose_control_notes,
635                ));
636                self.rollback(reason);
637                self.ctx.push_signal(note.content.as_text().unwrap_or_default().to_string());
638                self.phase = LoopPhase::Reason;
639                self.emit_call_llm()
640            }
641        }
642    }
643
644
645    /// Drain observations emitted during the last `start`/`feed` call.
646    pub fn take_observations(&mut self) -> Vec<KernelObservation> {
647        std::mem::take(&mut self.observations)
648    }
649
650    /// W2-2: Create a snapshot of the current kernel state for crash recovery or migration.
651    pub fn snapshot(&self) -> crate::runtime::snapshot::KernelSnapshot {
652        use crate::runtime::snapshot::{ContextSnapshot, KernelSnapshot};
653        let context = ContextSnapshot::from_context(&self.ctx);
654        KernelSnapshot::from_state(
655            self.turn,
656            self.total_tokens,
657            &self.tasks,
658            &context,
659            self.run_spec.as_ref(),
660        )
661    }
662
663    /// W2-2: Restore kernel state from a snapshot. Returns a new LoopStateMachine rebuilt from the snapshot.
664    /// Note: This is a foundational restore - some state (governance, milestone, signal router dedup) is
665    /// recreated from policy/config rather than serialized, following the principle that strategy is data.
666    pub fn restore(snap: &crate::runtime::snapshot::KernelSnapshot) -> Self {
667        use crate::signals::router::SignalRouter;
668
669        // Reconstruct policy from the max_tokens in snapshot
670        let policy = crate::scheduler::policy::LoopPolicy {
671            max_tokens: snap.context.max_tokens,
672            ..Default::default()
673        };
674
675        // Rebuild TaskTable from snapshot TCBs
676        let mut tasks = TaskTable::new();
677        for tcb_snap in &snap.tasks {
678            if let Some(tcb) = snap.restore_tcb(tcb_snap) {
679                tasks.insert(tcb);
680            }
681        }
682
683        // Rebuild context partitions from snapshot
684        let mut ctx = ContextManager::new(snap.context.max_tokens);
685        ctx.sprint = snap.context.sprint;
686
687        // Restore messages
688        for msg in &snap.context.system_messages {
689            let tokens = ctx.engine.count_message(msg);
690            ctx.partitions.system.push(msg.clone(), tokens);
691        }
692        for msg in &snap.context.knowledge_messages {
693            let tokens = ctx.engine.count_message(msg);
694            ctx.partitions.knowledge.push(msg.clone(), tokens);
695        }
696        for msg in &snap.context.history_messages {
697            let tokens = ctx.engine.count_message(msg);
698            ctx.partitions.history.push(msg.clone(), tokens);
699        }
700
701        // Restore task state
702        if let Some(goal) = &snap.context.task_goal {
703            ctx.partitions.task_state.goal = goal.clone();
704        }
705        if let Some(plan_json) = &snap.context.task_plan {
706            if let Ok(plan_steps) = serde_json::from_str::<Vec<crate::context::task_state::PlanStep>>(plan_json) {
707                ctx.partitions.task_state.plan = plan_steps;
708            }
709        }
710        if let Some(progress) = &snap.context.task_progress {
711            ctx.partitions.task_state.progress = progress.clone();
712        }
713        ctx.partitions.task_state.directives = snap.context.task_directives.clone();
714
715        // Restore signals
716        ctx.partitions.signals = snap.context.signals.clone();
717
718        Self {
719            phase: LoopPhase::Reason,
720            turn: snap.turn,
721            ctx,
722            tools: Vec::new(),  // Tools are rebuilt from capabilities on next LLM call
723            observations: Vec::new(),
724            policy,
725            total_tokens: snap.total_tokens,
726            pending_termination: None,
727            session_history_baseline: 0,
728            checkpoint: TurnCheckpoint::default(),
729            milestone: crate::scheduler::milestone::MilestoneTracker::new(),
730            run_spec: snap.run_spec(),
731            tasks,
732            governance: None,  // Governance is policy data, recreated from config
733            resource_quota: None,
734            memory_write_times: Vec::new(),
735            memory_policy: None,
736            signal_router: Some(SignalRouter::new(64)),  // Dedup cleared on restore
737            started_at_ms: None,
738            last_now_ms: None,
739            suspend_state: None,
740            pending_denied_results: Vec::new(),
741            workflow: None,
742        }
743    }
744
745    fn terminate(
746        &mut self,
747        termination: TerminationReason,
748        final_message: Option<Message>,
749    ) -> LoopAction {
750        // Commit the final response into history so subsequent session restores
751        // include the complete transcript: user → [tool turns] → final assistant.
752        if let Some(ref msg) = final_message {
753            let tokens = self.message_tokens(msg);
754            self.ctx.push_history(msg.clone(), tokens);
755        }
756        let result = LoopResult {
757            termination,
758            final_message,
759            turns_used: self.turn,
760            total_tokens_used: self.total_tokens,
761            loop_continue: None,
762            classify_branch: None,
763            tournament_winner: None,
764        };
765        self.set_lifecycle(TaskState::Done(termination), None);
766        LoopAction::Done { result }
767    }
768
769    /// Build the `CallLLM` action with a structured `RenderedContext`.
770    /// Meta-tools (skill / memory / knowledge) are appended to the tool list
771    /// when configured. When `pending_termination` is set, tools are stripped
772    /// to force a plain-text response before the loop terminates.
773    fn emit_call_llm(&mut self) -> LoopAction {
774        // Calling the provider is definitionally "running" — the single funnel for entering the
775        // Running lifecycle (covers start, resume, signal-driven turns, budget final-call).
776        self.set_lifecycle(TaskState::Running, None);
777        self.checkpoint.history_len = self.ctx.partitions.history.messages.len();
778        self.checkpoint.signals_len = self.ctx.partitions.signals.len();
779        self.checkpoint.task_state = Some(self.ctx.partitions.task_state.clone());
780        self.observations.push(KernelObservation::CheckpointTaken {
781            turn: self.turn,
782            history_len: self.checkpoint.history_len as u32,
783        });
784
785        let context = self.ctx.render();
786        if self.pending_termination.is_some() {
787            return LoopAction::CallLLM {
788                context,
789                tools: Vec::new(),
790            };
791        }
792        let mut tools = self.tools.clone();
793        tools.extend(self.ctx.meta_tool_schemas());
794
795        if let Some(ref spec) = self.run_spec {
796            use crate::types::capability::CapabilityKind;
797            tools.retain(|tool| {
798                let kind = match tool.name.as_str() {
799                    "skill" => CapabilityKind::Skill,
800                    "memory" => CapabilityKind::Memory,
801                    "knowledge" => CapabilityKind::Knowledge,
802                    _ => CapabilityKind::Tool,
803                };
804                let desc = crate::types::capability::CapabilityDescriptor::marker(
805                    kind,
806                    tool.name.clone(),
807                    &tool.description,
808                );
809                spec.capability_filter.allows(&desc)
810            });
811        }
812
813        // P1-B epoch skill gating (applied *after* the run-level filter ③, so A is the outer bound
814        // and B narrows within it — D6). When skills are active and declare tools, expose only
815        // `meta-tools ∪ stable-core ∪ ⋃(active skills' allowed_tools)`. `None` ⇒ no active/declared
816        // skill ⇒ no narrowing (D3, errs-open). Meta-tools are always exempt (D5) so the model can
817        // still load more skills. Byte-stable within an epoch: the set only changes on activation.
818        if let Some(allowed) = self.ctx.active_skill_tool_filter() {
819            let stable = &self.ctx.stable_core_tools;
820            tools.retain(|tool| {
821                matches!(tool.name.as_str(), "skill" | "memory" | "knowledge" | "update_plan")
822                    || stable.contains(&tool.name)
823                    || allowed.contains(&tool.name)
824            });
825        }
826
827        LoopAction::CallLLM { context, tools }
828    }
829
830    pub fn rollback(&mut self, reason: RollbackReason) {
831        self.ctx.partitions.history.messages.truncate(self.checkpoint.history_len);
832        self.ctx.partitions.signals.truncate(self.checkpoint.signals_len);
833        if let Some(ref state) = self.checkpoint.task_state {
834            self.ctx.partitions.task_state = state.clone();
835        }
836        self.observations.push(KernelObservation::Rollbacked {
837            turn: self.turn,
838            checkpoint_history_len: self.checkpoint.history_len as u32,
839            reason: Some(reason),
840        });
841    }
842
843    fn rollback_reason_for_tool_result(&self, result: &ToolResult) -> Option<RollbackReason> {
844        let tool_name = self.tool_name_for_call(&result.call_id);
845        let output = super::rollback::tool_result_output_text(result);
846
847        if result.is_fatal {
848            return Some(RollbackReason::FatalToolError {
849                tool_name,
850                error: output,
851            });
852        }
853
854        match result.error_kind {
855            Some(ToolErrorKind::Fatal) => Some(RollbackReason::FatalToolError {
856                tool_name,
857                error: output,
858            }),
859            Some(ToolErrorKind::GovernanceDenied) => Some(RollbackReason::GovernanceDenied {
860                tool_name,
861                reason: output,
862            }),
863            Some(ToolErrorKind::ProviderFailure) => {
864                Some(RollbackReason::ProviderFailure { error: output })
865            }
866            Some(ToolErrorKind::Timeout) => Some(RollbackReason::Timeout),
867            Some(ToolErrorKind::UserInterrupt) => Some(RollbackReason::UserInterrupt),
868            Some(ToolErrorKind::Recoverable) | None => None,
869        }
870    }
871
872    fn tool_name_for_call(&self, call_id: &compact_str::CompactString) -> String {
873        match &self.phase {
874            LoopPhase::Act { tool_calls } => tool_calls
875                .iter()
876                .find(|call| call.id == *call_id)
877                .map(|call| call.name.to_string())
878                .unwrap_or_else(|| call_id.to_string()),
879            _ => call_id.to_string(),
880        }
881    }
882}
883
884#[cfg(test)]
885#[path = "tests.rs"]
886mod tests;