1use std::collections::HashMap;
2
3use super::milestone::MilestoneTracker;
4use super::policy::LoopPolicy;
5use super::tcb::{ScheduleDecision, TaskState, TaskTable, Tcb, WaitReason};
6use crate::AgentRunSpec;
7use crate::context::manager::ContextManager;
8use crate::governance::pipeline::GovernancePipeline;
9use crate::signals::router::SignalRouter;
10use crate::types::result::SubAgentResult;
11use crate::context::renderer::RenderedContext;
12pub use crate::runtime::kernel::KernelObservation;
15use crate::runtime::session::RollbackReason;
16use crate::types::message::{
17 Content, ContentPart, Message, ToolCall, ToolErrorKind, ToolResult, ToolSchema,
18};
19use crate::types::milestone::MilestoneCheckResult;
20use crate::types::result::{LoopResult, TerminationReason};
21use crate::types::signal::RuntimeSignal;
22use crate::types::task::RuntimeTask;
23
24#[derive(Debug, Clone)]
32pub enum LoopPhase {
33 Reason,
34 Act { tool_calls: Vec<ToolCall> },
35 Observe { results: Vec<ToolResult> },
36 Delta { pressure: f64 },
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum SuspendReason {
42 AskUser,
44 SubAgentAwait,
46 External,
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum BlockReason {
53 ToolSuspend,
55 MilestoneAwait,
57}
58
59#[derive(Debug)]
61pub enum LoopEvent {
62 Start {
63 task: RuntimeTask,
64 },
65 LLMResponse {
66 message: Message,
67 },
68 ToolResults {
69 results: Vec<ToolResult>,
70 },
71 Signal {
73 signal: RuntimeSignal,
74 },
75 MilestoneResult {
78 result: MilestoneCheckResult,
79 },
80 SubAgentCompleted {
82 result: SubAgentResult,
83 },
84 Timeout,
85}
86
87#[derive(Debug)]
89pub enum LoopAction {
90 CallLLM {
95 context: RenderedContext,
96 tools: Vec<ToolSchema>,
97 },
98 ExecuteTools {
99 calls: Vec<ToolCall>,
100 },
101 Done {
102 result: LoopResult,
103 },
104 EvaluateMilestone {
109 phase_id: String,
110 criteria: Vec<String>,
111 verifier: Option<crate::types::milestone::MilestoneVerifier>,
112 required_evidence: Vec<String>,
113 },
114 AwaitingResume,
116}
117
118#[derive(Debug, Clone)]
120pub(super) enum SuspendState {
121 AskUser {
123 calls: Vec<ToolCall>,
124 gated_reasons: HashMap<String, String>,
125 },
126 SubAgentAwait {
128 agent_ids: Vec<String>,
129 },
130}
131
132pub(super) enum GateToolOutcome {
133 Proceed,
134 Blocked(LoopAction),
135 Suspended,
136}
137
138#[derive(Debug, Clone, Default)]
141pub struct TurnCheckpoint {
142 pub history_len: usize,
143 pub signals_len: usize,
144 pub task_state: Option<crate::context::task_state::TaskState>,
145}
146
147#[doc(hidden)]
152pub struct LoopStateMachine {
153 pub phase: LoopPhase,
154 pub turn: u32,
155 pub ctx: ContextManager,
156 pub tools: Vec<ToolSchema>,
157 pub observations: Vec<KernelObservation>,
158 pub(super) policy: LoopPolicy,
159 pub(super) total_tokens: u64,
160 pub(super) pending_termination: Option<TerminationReason>,
163 pub(super) session_history_baseline: usize,
166 pub(super) checkpoint: TurnCheckpoint,
167 pub(super) milestone: MilestoneTracker,
169 pub run_spec: Option<AgentRunSpec>,
170 pub(super) tasks: TaskTable,
175 pub(super) governance: Option<GovernancePipeline>,
179 pub(super) resource_quota: Option<crate::governance::quota::ResourceQuota>,
182 pub(super) memory_write_times: Vec<u64>,
185 pub(super) memory_policy: Option<crate::mm::memory::MemoryPolicy>,
188 pub(super) signal_router: Option<SignalRouter>,
192 pub(super) started_at_ms: Option<u64>,
195 pub(super) last_now_ms: Option<u64>,
197 pub(super) suspend_state: Option<SuspendState>,
199 pub(super) pending_denied_results: Vec<ToolResult>,
201 pub(super) workflow: Option<crate::orchestration::workflow::WorkflowRun>,
205}
206
207mod signal;
208mod capability;
209mod gate;
210mod eviction;
211mod process;
212mod workflow;
213mod milestone_exec;
214
215impl LoopStateMachine {
216 fn message_tokens(&self, message: &Message) -> u32 {
217 message
218 .token_count
219 .unwrap_or_else(|| self.ctx.engine.count_message(message))
220 }
221
222 pub fn new(policy: LoopPolicy) -> Self {
223 let mut tasks = TaskTable::new();
224 tasks.insert(Tcb::root("root", policy.clone()));
228 Self {
229 phase: LoopPhase::Reason,
231 turn: 0,
232 ctx: ContextManager::new(policy.max_tokens),
233 tools: Vec::new(),
234 observations: Vec::new(),
235 policy,
236 total_tokens: 0,
237 pending_termination: None,
238 session_history_baseline: 0,
239 checkpoint: TurnCheckpoint::default(),
240 milestone: MilestoneTracker::new(),
241 run_spec: None,
242 tasks,
243 governance: None,
244 resource_quota: None,
245 memory_write_times: Vec::new(),
246 memory_policy: None,
247 signal_router: Some(SignalRouter::new(64)),
248 started_at_ms: None,
249 last_now_ms: None,
250 suspend_state: None,
251 pending_denied_results: Vec::new(),
252 workflow: None,
253 }
254 }
255
256 pub fn lifecycle(&self) -> TaskState {
259 self.tasks.get("root").map(|t| t.state).unwrap_or(TaskState::Ready)
260 }
261
262 pub fn wait_reason(&self) -> Option<WaitReason> {
264 self.tasks.get("root").and_then(|t| t.wait.clone())
265 }
266
267 pub fn is_terminal(&self) -> bool {
269 matches!(self.lifecycle(), TaskState::Done(_))
270 }
271
272 pub fn is_suspended(&self) -> bool {
274 matches!(self.lifecycle(), TaskState::Suspended)
275 }
276
277 fn set_lifecycle(&mut self, state: TaskState, wait: Option<WaitReason>) {
279 if let Some(root) = self.tasks.get_mut("root") {
280 root.state = state;
281 root.wait = wait;
282 } else {
283 let mut root = Tcb::root("root", self.policy.clone());
284 root.state = state;
285 root.wait = wait;
286 self.tasks.insert(root);
287 }
288 }
289
290 fn root_tcb(&self) -> Tcb {
294 let mut tcb = Tcb::root("root", self.policy.clone());
295 tcb.budget.turns = self.turn;
296 tcb.budget.total_tokens = self.total_tokens;
297 tcb.budget.started_at_ms = self.started_at_ms;
298 tcb.state = self.lifecycle();
299 tcb
300 }
301
302 pub fn set_wall_budget(&mut self, max_wall_ms: Option<u64>) {
304 self.policy.max_wall_ms = max_wall_ms;
305 }
306
307 pub fn set_governance(&mut self, pipeline: GovernancePipeline) {
312 self.governance = Some(pipeline);
313 }
314
315 pub fn set_resource_quota(&mut self, quota: crate::governance::quota::ResourceQuota) {
318 self.resource_quota = Some(quota);
319 }
320
321 pub fn set_memory_policy(&mut self, policy: crate::mm::memory::MemoryPolicy) {
325 self.memory_policy = Some(policy);
326 }
327
328 pub fn memory_policy(&self) -> Option<&crate::mm::memory::MemoryPolicy> {
330 self.memory_policy.as_ref()
331 }
332
333 pub fn set_observed_time(&mut self, now_ms: u64) {
335 if self.started_at_ms.is_none() {
336 self.started_at_ms = Some(now_ms);
337 }
338 self.last_now_ms = Some(now_ms);
339 if let Some(pipeline) = self.governance.as_mut() {
340 pipeline.set_time(now_ms);
341 }
342 }
343
344 pub fn preload_history(&mut self, messages: Vec<Message>) {
349 for msg in messages {
350 let tokens = self.message_tokens(&msg);
351 self.ctx.push_history(msg, tokens);
352 }
353 self.session_history_baseline = self.ctx.partitions.history.messages.len();
354 }
355
356 pub fn resume_after_preload(&mut self) -> LoopAction {
362 self.observations.clear();
363 let calls = crate::runtime::repair::pending_tool_calls_from_messages(
364 &self.ctx.partitions.history.messages,
365 );
366 if !calls.is_empty() {
367 self.emit_page_in_requested(&calls);
368 self.phase = LoopPhase::Act {
369 tool_calls: calls.clone(),
370 };
371 self.set_lifecycle(TaskState::Running, None);
372 return LoopAction::ExecuteTools { calls };
373 }
374 self.phase = LoopPhase::Reason;
375 self.emit_call_llm()
376 }
377
378 pub fn drain_new_messages(&self) -> Vec<Message> {
384 let history = &self.ctx.partitions.history.messages;
385 let start = self.session_history_baseline.min(history.len());
386 history[start..].to_vec()
387 }
388
389 pub fn start(&mut self, task: RuntimeTask) -> LoopAction {
390 self.observations.clear();
391 self.ctx.init_task(task.goal.clone(), task.criteria.clone());
392
393 let user_msg = "Proceed with the task described in [TASK STATE].".to_string();
394
395 let user_tokens = self.ctx.engine.count(&user_msg).max(1);
401 self.ctx.push_history(Message::user(user_msg), user_tokens);
402 self.phase = LoopPhase::Reason;
403 self.emit_call_llm()
405 }
406
407 pub fn feed(&mut self, event: LoopEvent) -> LoopAction {
408 self.observations.clear();
409 self.sweep_expired_leases();
410
411 match event {
412 LoopEvent::Start { task } => self.start(task),
413
414 LoopEvent::LLMResponse { message } => {
415 let tokens = self.message_tokens(&message);
416 self.total_tokens += tokens as u64;
417
418 if let Some(reason) = self.pending_termination.take() {
419 return self.terminate(reason, Some(message));
420 }
421
422 if message.tool_calls.is_empty() {
423 if !self.milestone.is_complete() {
426 let phase_id = self.milestone.current_phase_id().unwrap_or("").to_string();
427 let criteria = self.milestone.current_criteria().to_vec();
428 let (verifier, required_evidence) = self
429 .milestone
430 .contract
431 .as_ref()
432 .and_then(|c| c.phases.get(self.milestone.current_phase))
433 .map(|p| (p.verifier.clone(), p.required_evidence.clone()))
434 .unwrap_or_default();
435 self.ctx.push_history(message, tokens);
437 return LoopAction::EvaluateMilestone {
438 phase_id,
439 criteria,
440 verifier,
441 required_evidence,
442 };
443 }
444 return self.terminate(TerminationReason::Completed, Some(message));
445 }
446
447 let calls = message.tool_calls.clone();
448 self.ctx.push_history(message, tokens);
449
450 if let Some(now_ms) = self.last_now_ms {
452 self.ctx.record_activity(now_ms);
453 }
454
455 match self.gate_tool_calls(&calls) {
456 GateToolOutcome::Blocked(action) => return action,
457 GateToolOutcome::Suspended => return LoopAction::AwaitingResume,
458 GateToolOutcome::Proceed => {}
459 }
460 self.emit_page_in_requested(&calls);
461 self.phase = LoopPhase::Act {
462 tool_calls: calls.clone(),
463 };
464 self.set_lifecycle(TaskState::Running, None);
465 LoopAction::ExecuteTools { calls }
466 }
467
468 LoopEvent::ToolResults { mut results } => {
469 if !self.pending_denied_results.is_empty() {
470 results.append(&mut self.pending_denied_results);
471 }
472 if let Some(reason) = results
473 .iter()
474 .find_map(|result| self.rollback_reason_for_tool_result(result))
475 {
476 let note = Message::user(super::rollback::build_rollback_note(
477 &reason,
478 self.ctx.config.verbose_control_notes,
479 ));
480 self.rollback(reason);
481 self.ctx.push_signal(note.content.as_text().unwrap_or_default().to_string());
482 self.phase = LoopPhase::Reason;
483 return self.emit_call_llm();
484 }
485 for r in &results {
489 self.total_tokens += r.token_count.unwrap_or(0) as u64;
490 let raw_output = match &r.output {
493 Content::Text(s) => s.clone(),
494 Content::Parts(parts) => serde_json::to_string(parts).unwrap_or_default(),
495 };
496 let (output, spooled) = match crate::mm::plan_spool(
499 &raw_output,
500 self.ctx.config.spool_threshold_bytes,
501 self.ctx.config.spool_preview_bytes,
502 ) {
503 Some(decision) => {
504 self.observations.push(KernelObservation::LargeResultSpooled {
505 turn: self.turn,
506 call_id: r.call_id.to_string(),
507 tool: String::new(),
509 original_size: decision.original_size,
510 preview_size: decision.preview.len() as u32,
511 spool_ref: None,
512 });
513 (decision.preview, true)
514 }
515 None => (raw_output, false),
516 };
517 let parts = vec![ContentPart::ToolResult {
518 call_id: r.call_id.clone(),
519 output,
520 is_error: r.is_error,
521 }];
522 let tool_msg = Message::tool(parts);
523 let tokens = if spooled {
525 self.ctx.engine.count_message(&tool_msg)
526 } else {
527 r.token_count
528 .unwrap_or_else(|| self.ctx.engine.count_message(&tool_msg))
529 };
530 self.ctx.push_history(tool_msg, tokens);
531 if spooled {
534 self.ctx.mark_spooled(&r.call_id, r.call_id.to_string());
535 }
536 }
537 self.turn += 1;
538
539 if let ScheduleDecision::Terminate { reason: term, .. } =
543 super::tcb::schedule(&self.root_tcb(), self.last_now_ms)
544 {
545 let budget = match term {
546 TerminationReason::MaxTurns => "max_turns",
547 TerminationReason::Timeout => "wall_time",
548 _ => "token_budget",
549 };
550 self.observations.push(KernelObservation::BudgetExceeded {
551 turn: self.turn,
552 budget: budget.to_string(),
553 });
554 self.pending_termination = Some(term);
555 self.phase = LoopPhase::Reason;
556 return self.emit_call_llm();
557 }
558
559 let idle_decay = self
564 .last_now_ms
565 .is_some_and(|now_ms| self.ctx.should_time_decay_compact(now_ms));
566 if idle_decay {
567 self.execute_eviction_op(&crate::mm::EvictionOp::TimeDecayMicro);
568 }
569
570 self.ctx.recompute_handle_residency();
572 self.phase = LoopPhase::Delta {
573 pressure: self.ctx.rho(),
574 };
575
576 let (target_tokens, preserve_turns) = self.ctx.plan_compaction_params();
580 let plan =
581 crate::mm::plan_eviction(self.ctx.should_compress(), idle_decay, target_tokens, preserve_turns);
582 debug_assert!(!idle_decay || plan.has_time_decay());
587 for op in &plan.ops {
588 if matches!(op, crate::mm::EvictionOp::TimeDecayMicro) && idle_decay {
590 continue;
591 }
592 self.execute_eviction_op(op);
593 }
594
595 if self.ctx.should_renew() {
598 self.ctx.renew();
599 if let Some(router) = self.signal_router.as_mut() {
603 router.clear_dedup();
604 }
605 self.observations.push(KernelObservation::Renewed {
606 sprint: self.ctx.sprint,
607 });
608 }
609
610 self.drain_queued_signals();
613
614 self.phase = LoopPhase::Reason;
615 self.emit_call_llm()
616 }
617
618 LoopEvent::Signal { signal } => {
619 self.dispatch_signal(signal)
623 .unwrap_or_else(|| self.emit_call_llm())
624 }
625
626 LoopEvent::MilestoneResult { result } => self.handle_milestone_result(result),
627
628 LoopEvent::SubAgentCompleted { result } => self.handle_sub_agent_completed(result),
629
630 LoopEvent::Timeout => {
631 let reason = RollbackReason::Timeout;
632 let note = Message::user(super::rollback::build_rollback_note(
633 &reason,
634 self.ctx.config.verbose_control_notes,
635 ));
636 self.rollback(reason);
637 self.ctx.push_signal(note.content.as_text().unwrap_or_default().to_string());
638 self.phase = LoopPhase::Reason;
639 self.emit_call_llm()
640 }
641 }
642 }
643
644
645 pub fn take_observations(&mut self) -> Vec<KernelObservation> {
647 std::mem::take(&mut self.observations)
648 }
649
650 pub fn snapshot(&self) -> crate::runtime::snapshot::KernelSnapshot {
652 use crate::runtime::snapshot::{ContextSnapshot, KernelSnapshot};
653 let context = ContextSnapshot::from_context(&self.ctx);
654 KernelSnapshot::from_state(
655 self.turn,
656 self.total_tokens,
657 &self.tasks,
658 &context,
659 self.run_spec.as_ref(),
660 )
661 }
662
663 pub fn restore(snap: &crate::runtime::snapshot::KernelSnapshot) -> Self {
667 use crate::signals::router::SignalRouter;
668
669 let policy = crate::scheduler::policy::LoopPolicy {
671 max_tokens: snap.context.max_tokens,
672 ..Default::default()
673 };
674
675 let mut tasks = TaskTable::new();
677 for tcb_snap in &snap.tasks {
678 if let Some(tcb) = snap.restore_tcb(tcb_snap) {
679 tasks.insert(tcb);
680 }
681 }
682
683 let mut ctx = ContextManager::new(snap.context.max_tokens);
685 ctx.sprint = snap.context.sprint;
686
687 for msg in &snap.context.system_messages {
689 let tokens = ctx.engine.count_message(msg);
690 ctx.partitions.system.push(msg.clone(), tokens);
691 }
692 for msg in &snap.context.knowledge_messages {
693 let tokens = ctx.engine.count_message(msg);
694 ctx.partitions.knowledge.push(msg.clone(), tokens);
695 }
696 for msg in &snap.context.history_messages {
697 let tokens = ctx.engine.count_message(msg);
698 ctx.partitions.history.push(msg.clone(), tokens);
699 }
700
701 if let Some(goal) = &snap.context.task_goal {
703 ctx.partitions.task_state.goal = goal.clone();
704 }
705 if let Some(plan_json) = &snap.context.task_plan {
706 if let Ok(plan_steps) = serde_json::from_str::<Vec<crate::context::task_state::PlanStep>>(plan_json) {
707 ctx.partitions.task_state.plan = plan_steps;
708 }
709 }
710 if let Some(progress) = &snap.context.task_progress {
711 ctx.partitions.task_state.progress = progress.clone();
712 }
713 ctx.partitions.task_state.directives = snap.context.task_directives.clone();
714
715 ctx.partitions.signals = snap.context.signals.clone();
717
718 Self {
719 phase: LoopPhase::Reason,
720 turn: snap.turn,
721 ctx,
722 tools: Vec::new(), observations: Vec::new(),
724 policy,
725 total_tokens: snap.total_tokens,
726 pending_termination: None,
727 session_history_baseline: 0,
728 checkpoint: TurnCheckpoint::default(),
729 milestone: crate::scheduler::milestone::MilestoneTracker::new(),
730 run_spec: snap.run_spec(),
731 tasks,
732 governance: None, resource_quota: None,
734 memory_write_times: Vec::new(),
735 memory_policy: None,
736 signal_router: Some(SignalRouter::new(64)), started_at_ms: None,
738 last_now_ms: None,
739 suspend_state: None,
740 pending_denied_results: Vec::new(),
741 workflow: None,
742 }
743 }
744
745 fn terminate(
746 &mut self,
747 termination: TerminationReason,
748 final_message: Option<Message>,
749 ) -> LoopAction {
750 if let Some(ref msg) = final_message {
753 let tokens = self.message_tokens(msg);
754 self.ctx.push_history(msg.clone(), tokens);
755 }
756 let result = LoopResult {
757 termination,
758 final_message,
759 turns_used: self.turn,
760 total_tokens_used: self.total_tokens,
761 loop_continue: None,
762 classify_branch: None,
763 tournament_winner: None,
764 };
765 self.set_lifecycle(TaskState::Done(termination), None);
766 LoopAction::Done { result }
767 }
768
769 fn emit_call_llm(&mut self) -> LoopAction {
774 self.set_lifecycle(TaskState::Running, None);
777 self.checkpoint.history_len = self.ctx.partitions.history.messages.len();
778 self.checkpoint.signals_len = self.ctx.partitions.signals.len();
779 self.checkpoint.task_state = Some(self.ctx.partitions.task_state.clone());
780 self.observations.push(KernelObservation::CheckpointTaken {
781 turn: self.turn,
782 history_len: self.checkpoint.history_len as u32,
783 });
784
785 let context = self.ctx.render();
786 if self.pending_termination.is_some() {
787 return LoopAction::CallLLM {
788 context,
789 tools: Vec::new(),
790 };
791 }
792 let mut tools = self.tools.clone();
793 tools.extend(self.ctx.meta_tool_schemas());
794
795 if let Some(ref spec) = self.run_spec {
796 use crate::types::capability::CapabilityKind;
797 tools.retain(|tool| {
798 let kind = match tool.name.as_str() {
799 "skill" => CapabilityKind::Skill,
800 "memory" => CapabilityKind::Memory,
801 "knowledge" => CapabilityKind::Knowledge,
802 _ => CapabilityKind::Tool,
803 };
804 let desc = crate::types::capability::CapabilityDescriptor::marker(
805 kind,
806 tool.name.clone(),
807 &tool.description,
808 );
809 spec.capability_filter.allows(&desc)
810 });
811 }
812
813 if let Some(allowed) = self.ctx.active_skill_tool_filter() {
819 let stable = &self.ctx.stable_core_tools;
820 tools.retain(|tool| {
821 matches!(tool.name.as_str(), "skill" | "memory" | "knowledge" | "update_plan")
822 || stable.contains(&tool.name)
823 || allowed.contains(&tool.name)
824 });
825 }
826
827 LoopAction::CallLLM { context, tools }
828 }
829
830 pub fn rollback(&mut self, reason: RollbackReason) {
831 self.ctx.partitions.history.messages.truncate(self.checkpoint.history_len);
832 self.ctx.partitions.signals.truncate(self.checkpoint.signals_len);
833 if let Some(ref state) = self.checkpoint.task_state {
834 self.ctx.partitions.task_state = state.clone();
835 }
836 self.observations.push(KernelObservation::Rollbacked {
837 turn: self.turn,
838 checkpoint_history_len: self.checkpoint.history_len as u32,
839 reason: Some(reason),
840 });
841 }
842
843 fn rollback_reason_for_tool_result(&self, result: &ToolResult) -> Option<RollbackReason> {
844 let tool_name = self.tool_name_for_call(&result.call_id);
845 let output = super::rollback::tool_result_output_text(result);
846
847 if result.is_fatal {
848 return Some(RollbackReason::FatalToolError {
849 tool_name,
850 error: output,
851 });
852 }
853
854 match result.error_kind {
855 Some(ToolErrorKind::Fatal) => Some(RollbackReason::FatalToolError {
856 tool_name,
857 error: output,
858 }),
859 Some(ToolErrorKind::GovernanceDenied) => Some(RollbackReason::GovernanceDenied {
860 tool_name,
861 reason: output,
862 }),
863 Some(ToolErrorKind::ProviderFailure) => {
864 Some(RollbackReason::ProviderFailure { error: output })
865 }
866 Some(ToolErrorKind::Timeout) => Some(RollbackReason::Timeout),
867 Some(ToolErrorKind::UserInterrupt) => Some(RollbackReason::UserInterrupt),
868 Some(ToolErrorKind::Recoverable) | None => None,
869 }
870 }
871
872 fn tool_name_for_call(&self, call_id: &compact_str::CompactString) -> String {
873 match &self.phase {
874 LoopPhase::Act { tool_calls } => tool_calls
875 .iter()
876 .find(|call| call.id == *call_id)
877 .map(|call| call.name.to_string())
878 .unwrap_or_else(|| call_id.to_string()),
879 _ => call_id.to_string(),
880 }
881 }
882}
883
884#[cfg(test)]
885#[path = "tests.rs"]
886mod tests;