1use serde::{Deserialize, Serialize};
9
10use crate::context::pressure::PressureAction;
11use crate::context::renderer::RenderedContext;
12use crate::context::task_state::TaskUpdate;
13use crate::context::token_engine::ContextTokenEngine;
14use crate::runtime::session::RollbackReason;
15use crate::scheduler::policy::LoopPolicy;
16use crate::scheduler::state_machine::{LoopAction, LoopEvent, LoopStateMachine};
17use crate::types::agent::AgentRunSpec;
18use crate::types::capability::{CapabilityCommand, CapabilityDescriptor, CapabilityKind};
19use crate::types::message::{Message, ToolCall, ToolResult, ToolSchema};
20use crate::types::milestone::{MilestoneCheckResult, MilestoneContract};
21use crate::types::result::{LoopResult, SubAgentResult};
22use crate::types::signal::RuntimeSignal;
23use crate::types::skill::SkillMetadata;
24use crate::types::task::RuntimeTask;
25
26pub const KERNEL_ABI_VERSION: u32 = 1;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
32#[serde(rename_all = "snake_case")]
33pub enum PolicyAction {
34 Allow,
35 Deny,
36 AskUser,
37}
38
39impl From<PolicyAction> for crate::governance::permission::PermissionAction {
40 fn from(action: PolicyAction) -> Self {
41 match action {
42 PolicyAction::Allow => Self::Allow,
43 PolicyAction::Deny => Self::Deny,
44 PolicyAction::AskUser => Self::AskUser,
45 }
46 }
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct PolicyRule {
52 pub tool_pattern: String,
53 pub action: PolicyAction,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct RateLimitSpec {
60 pub tool: String,
61 pub max_calls: u32,
62 pub window_ms: u64,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
69#[serde(tag = "kind", rename_all = "snake_case")]
70pub enum ConstraintSpec {
71 Required { tool: String, path: String },
73 Enum {
75 tool: String,
76 path: String,
77 values: Vec<String>,
78 },
79 Range {
81 tool: String,
82 path: String,
83 #[serde(default, skip_serializing_if = "Option::is_none")]
84 min: Option<f64>,
85 #[serde(default, skip_serializing_if = "Option::is_none")]
86 max: Option<f64>,
87 },
88}
89
90fn default_signal_queue_size() -> u32 {
91 64
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct KernelInput {
96 pub version: u32,
97 pub event: KernelInputEvent,
98}
99
100impl KernelInput {
101 pub fn new(event: KernelInputEvent) -> Self {
102 Self {
103 version: KERNEL_ABI_VERSION,
104 event,
105 }
106 }
107}
108
109#[derive(Debug, Clone, Default, Serialize, Deserialize)]
112pub struct GovernanceConfig {
113 #[serde(default, skip_serializing_if = "Option::is_none")]
114 pub default_action: Option<PolicyAction>,
115 #[serde(default, skip_serializing_if = "Vec::is_empty")]
116 pub rules: Vec<PolicyRule>,
117 #[serde(default, skip_serializing_if = "Vec::is_empty")]
118 pub vetoed_tools: Vec<String>,
119 #[serde(default, skip_serializing_if = "Vec::is_empty")]
120 pub rate_limits: Vec<RateLimitSpec>,
121 #[serde(default, skip_serializing_if = "Vec::is_empty")]
122 pub constraints: Vec<ConstraintSpec>,
123}
124
125#[derive(Debug, Clone, Default, Serialize, Deserialize)]
129pub struct RunConfig {
130 #[serde(default, skip_serializing_if = "Option::is_none")]
131 pub tools: Option<Vec<ToolSchema>>,
132 #[serde(default, skip_serializing_if = "Option::is_none")]
133 pub available_skills: Option<Vec<SkillMetadata>>,
134 #[serde(default, skip_serializing_if = "Option::is_none")]
135 pub stable_core_tools: Option<Vec<String>>,
136 #[serde(default, skip_serializing_if = "Option::is_none")]
137 pub memory_enabled: Option<bool>,
138 #[serde(default, skip_serializing_if = "Option::is_none")]
139 pub knowledge_enabled: Option<bool>,
140 #[serde(default, skip_serializing_if = "Option::is_none")]
141 pub plan_tool_enabled: Option<bool>,
142 #[serde(default, skip_serializing_if = "Option::is_none")]
144 pub tokenizer: Option<String>,
145 #[serde(default, skip_serializing_if = "Option::is_none")]
146 pub governance: Option<GovernanceConfig>,
147 #[serde(default, skip_serializing_if = "Option::is_none")]
148 pub attention_max_queue_size: Option<u32>,
149 #[serde(default, skip_serializing_if = "Option::is_none")]
150 pub scheduler_max_wall_ms: Option<u64>,
151 #[serde(default, skip_serializing_if = "Option::is_none")]
152 pub resource_quota: Option<crate::governance::quota::ResourceQuota>,
153 #[serde(default, skip_serializing_if = "Option::is_none")]
157 pub group_tokens_base: Option<u64>,
158 #[serde(default, skip_serializing_if = "Option::is_none")]
162 pub group_spawns_base: Option<u32>,
163}
164
165pub(crate) fn build_governance_pipeline(
169 default_action: Option<PolicyAction>,
170 rules: Vec<PolicyRule>,
171 vetoed_tools: Vec<String>,
172 rate_limits: Vec<RateLimitSpec>,
173 constraints: Vec<ConstraintSpec>,
174) -> crate::governance::pipeline::GovernancePipeline {
175 use crate::governance::constraint::{ConstraintRule, ParamConstraint};
176 use crate::governance::permission::PermissionRule;
177 use crate::governance::rate_limit::RateLimit;
178 let default = default_action.unwrap_or(PolicyAction::Allow).into();
179 let mut pipeline = crate::governance::pipeline::GovernancePipeline::new(default);
180 for rule in rules {
181 pipeline.permission.add_rule(PermissionRule {
182 tool_pattern: rule.tool_pattern.into(),
183 action: rule.action.into(),
184 });
185 }
186 for tool in vetoed_tools {
187 pipeline.veto.block_tool(tool);
188 }
189 for rl in rate_limits {
190 pipeline.rate_limiter.set_limit(
191 rl.tool,
192 RateLimit {
193 max_calls: rl.max_calls,
194 window_ms: rl.window_ms,
195 },
196 );
197 }
198 for c in constraints {
199 let (tool_name, param_path, rule) = match c {
200 ConstraintSpec::Required { tool, path } => (tool, path, ConstraintRule::Required),
201 ConstraintSpec::Enum { tool, path, values } => (tool, path, ConstraintRule::Enum(values)),
202 ConstraintSpec::Range { tool, path, min, max } => {
203 (tool, path, ConstraintRule::Range { min, max })
204 }
205 };
206 pipeline.constraints.add(ParamConstraint {
207 tool_name,
208 param_path,
209 rule,
210 });
211 }
212 pipeline
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
216#[serde(tag = "kind", rename_all = "snake_case")]
217pub enum KernelInputEvent {
218 SetTools {
219 tools: Vec<ToolSchema>,
220 },
221 SetAvailableSkills {
222 skills: Vec<SkillMetadata>,
223 },
224 SkillActivated {
228 name: String,
229 },
230 SetStableCoreTools {
233 tool_ids: Vec<String>,
234 },
235 SetMemoryEnabled {
236 enabled: bool,
237 },
238 SetKnowledgeEnabled {
239 enabled: bool,
240 },
241 SetPlanToolEnabled {
242 enabled: bool,
243 },
244 SetTokenizer {
245 name: String,
246 },
247 AddSystemMessage {
248 content: String,
249 tokens: u32,
250 },
251 AddKnowledgeMessage {
252 content: String,
253 tokens: u32,
254 },
255 AddHistoryMessage {
256 message: Message,
257 tokens: Option<u32>,
258 },
259 PreloadHistory {
260 messages: Vec<Message>,
261 },
262 MountCapability {
263 capability: CapabilityDescriptor,
264 },
265 UnmountCapability {
266 capability_kind: CapabilityKind,
267 id: String,
268 },
269 LoadMilestoneContract {
270 contract: MilestoneContract,
271 },
272 LoadGovernancePolicy {
276 #[serde(default)]
277 default_action: Option<PolicyAction>,
278 #[serde(default, skip_serializing_if = "Vec::is_empty")]
279 rules: Vec<PolicyRule>,
280 #[serde(default, skip_serializing_if = "Vec::is_empty")]
281 vetoed_tools: Vec<String>,
282 #[serde(default, skip_serializing_if = "Vec::is_empty")]
285 rate_limits: Vec<RateLimitSpec>,
286 #[serde(default, skip_serializing_if = "Vec::is_empty")]
287 constraints: Vec<ConstraintSpec>,
288 },
289 SetAttentionPolicy {
292 #[serde(default = "default_signal_queue_size")]
293 max_queue_size: u32,
294 },
295 ForceCompact,
296 UpdateTask {
297 update: TaskUpdate,
298 },
299 StartRun {
300 task: RuntimeTask,
301 #[serde(default, skip_serializing_if = "Option::is_none")]
302 run_spec: Option<AgentRunSpec>,
303 },
304 ConfigureRun {
309 config: RunConfig,
310 },
311 CapabilityCommand {
312 command: CapabilityCommand,
313 },
314 Resume {
315 #[serde(default, skip_serializing_if = "Vec::is_empty")]
319 approved_calls: Vec<String>,
320 #[serde(default, skip_serializing_if = "Vec::is_empty")]
321 denied_calls: Vec<String>,
322 },
323 SetSchedulerBudget {
327 #[serde(default, skip_serializing_if = "Option::is_none")]
328 max_wall_ms: Option<u64>,
329 },
330 SetResourceQuota {
336 quota: crate::governance::quota::ResourceQuota,
337 },
338 ProviderResult {
339 message: Message,
340 #[serde(default, skip_serializing_if = "Option::is_none")]
341 observed_input_tokens: Option<u32>,
342 #[serde(default, skip_serializing_if = "Option::is_none")]
343 observed_output_tokens: Option<u32>,
344 #[serde(default, skip_serializing_if = "Option::is_none")]
348 now_ms: Option<u64>,
349 },
350 ToolResults {
351 results: Vec<ToolResult>,
352 },
353 Signal {
354 signal: RuntimeSignal,
355 },
356 MilestoneResult {
357 result: MilestoneCheckResult,
358 },
359 SpawnSubAgent {
361 spec: AgentRunSpec,
362 parent_session_id: String,
363 },
364 LoadWorkflow {
369 spec: crate::orchestration::workflow::WorkflowSpec,
370 parent_session_id: String,
371 #[serde(default, skip_serializing_if = "Vec::is_empty")]
373 resumed_completed: Vec<String>,
374 #[serde(default, skip_serializing_if = "Vec::is_empty")]
378 resumed_submissions: Vec<Vec<crate::orchestration::workflow::WorkflowNode>>,
379 },
380 SubAgentCompleted {
382 result: SubAgentResult,
383 },
384 SubmitWorkflowNodes {
389 #[serde(default, skip_serializing_if = "Vec::is_empty")]
390 nodes: Vec<crate::orchestration::workflow::WorkflowNode>,
391 #[serde(default, skip_serializing_if = "Option::is_none")]
395 submitter_agent_id: Option<String>,
396 },
397 SubmitWorkflow {
403 spec: crate::orchestration::workflow::WorkflowSpec,
404 #[serde(default)]
406 parent_session_id: String,
407 #[serde(default, skip_serializing_if = "Option::is_none")]
410 submitter_agent_id: Option<String>,
411 },
412 PageIn {
415 #[serde(default, skip_serializing_if = "Vec::is_empty")]
416 entries: Vec<crate::mm::PageInEntry>,
417 },
418 SetMemoryPolicy {
421 #[serde(default)]
422 memory_path: String,
423 #[serde(default = "default_stale_days")]
424 stale_warning_days: u32,
425 #[serde(default = "default_top_k")]
426 retrieval_top_k: usize,
427 #[serde(default = "default_validation_enabled")]
428 validation_enabled: bool,
429 #[serde(default, skip_serializing_if = "Option::is_none")]
431 max_content_bytes: Option<u32>,
432 #[serde(default, skip_serializing_if = "Option::is_none")]
434 max_name_length: Option<usize>,
435 },
436 WriteMemory {
438 memory: crate::mm::memory::MemoryWriteRequest,
439 },
440 QueryMemory {
442 query: crate::mm::memory::MemoryQuery,
443 },
444 MemoryRetrievalResult {
446 retrieval: crate::mm::memory::MemoryRetrieval,
447 },
448 Timeout,
449}
450
451fn default_stale_days() -> u32 { 2 }
452fn default_top_k() -> usize { 5 }
453fn default_validation_enabled() -> bool { true }
454
455#[derive(Debug, Clone, Serialize, Deserialize)]
456pub struct KernelStep {
457 pub version: u32,
458 pub actions: Vec<KernelAction>,
459 pub observations: Vec<KernelObservation>,
460}
461
462impl KernelStep {
463 fn empty(observations: Vec<KernelObservation>) -> Self {
464 Self {
465 version: KERNEL_ABI_VERSION,
466 actions: Vec::new(),
467 observations,
468 }
469 }
470
471 fn single(action: LoopAction, observations: Vec<KernelObservation>) -> Self {
472 Self {
473 version: KERNEL_ABI_VERSION,
474 actions: vec![action.into()],
475 observations,
476 }
477 }
478}
479
480#[derive(Debug, Clone, Serialize, Deserialize)]
481#[serde(tag = "kind", rename_all = "snake_case")]
482pub enum KernelAction {
483 CallProvider {
484 context: RenderedContext,
485 tools: Vec<ToolSchema>,
486 },
487 ExecuteTool {
488 calls: Vec<ToolCall>,
489 },
490 EvaluateMilestone {
491 phase_id: String,
492 criteria: Vec<String>,
493 #[serde(default, skip_serializing_if = "Option::is_none")]
494 verifier: Option<crate::types::milestone::MilestoneVerifier>,
495 #[serde(default, skip_serializing_if = "Vec::is_empty")]
496 required_evidence: Vec<String>,
497 },
498 Done {
499 result: LoopResult,
500 },
501}
502
503impl From<LoopAction> for KernelAction {
504 fn from(action: LoopAction) -> Self {
505 match action {
506 LoopAction::AwaitingResume => {
507 panic!("AwaitingResume must not be converted to KernelAction")
508 }
509 LoopAction::CallLLM { context, tools } => Self::CallProvider { context, tools },
510 LoopAction::ExecuteTools { calls } => Self::ExecuteTool { calls },
511 LoopAction::EvaluateMilestone {
512 phase_id,
513 criteria,
514 verifier,
515 required_evidence,
516 } => Self::EvaluateMilestone {
517 phase_id,
518 criteria,
519 verifier,
520 required_evidence,
521 },
522 LoopAction::Done { result } => Self::Done { result },
523 }
524 }
525}
526
527#[derive(Debug, Clone, Serialize, Deserialize)]
528#[serde(tag = "kind", rename_all = "snake_case")]
529pub enum KernelObservation {
530 Compressed {
531 action: KernelPressureAction,
532 rho_after: f64,
533 summary: Option<String>,
534 archived: Vec<Message>,
535 #[serde(default, skip_serializing_if = "Option::is_none")]
539 invalidates_prefix_at: Option<usize>,
540 },
541 Renewed {
542 sprint: u32,
543 },
544 Rollbacked {
545 turn: u32,
546 checkpoint_history_len: u32,
547 #[serde(default, skip_serializing_if = "Option::is_none")]
548 reason: Option<RollbackReason>,
549 },
550 CapabilityChanged {
551 turn: u32,
552 #[serde(default, skip_serializing_if = "Vec::is_empty")]
553 added: Vec<String>,
554 #[serde(default, skip_serializing_if = "Vec::is_empty")]
555 removed: Vec<String>,
556 #[serde(default, skip_serializing_if = "Option::is_none")]
557 change_kind: Option<String>,
558 #[serde(default, skip_serializing_if = "Option::is_none")]
559 capability_id: Option<String>,
560 #[serde(default, skip_serializing_if = "Option::is_none")]
561 version: Option<String>,
562 #[serde(default, skip_serializing_if = "Option::is_none")]
563 mounted_by: Option<String>,
564 #[serde(default, skip_serializing_if = "Option::is_none")]
565 mount_reason: Option<String>,
566 },
567 MilestoneAdvanced {
568 turn: u32,
569 phase_id: String,
570 capabilities_unlocked: Vec<String>,
571 },
572 MilestoneBlocked {
573 turn: u32,
574 phase_id: String,
575 reason: String,
576 },
577 MilestoneEvidence {
579 turn: u32,
580 phase_id: String,
581 #[serde(default, skip_serializing_if = "Vec::is_empty")]
582 evidence: Vec<String>,
583 },
584 CheckpointTaken {
586 turn: u32,
587 history_len: u32,
588 },
589 AgentProcessChanged {
591 turn: u32,
592 agent_id: String,
593 parent_session_id: String,
594 role: String,
595 isolation: String,
596 context_inheritance: String,
597 state: String,
598 #[serde(default, skip_serializing_if = "Vec::is_empty")]
599 permitted_capability_ids: Vec<String>,
600 #[serde(default, skip_serializing_if = "Option::is_none")]
601 result_termination: Option<String>,
602 },
603 WorkflowBatchSpawned {
606 turn: u32,
607 nodes: Vec<crate::orchestration::workflow::WorkflowSpawnInfo>,
608 #[serde(default, skip_serializing_if = "Option::is_none")]
612 budget: Option<crate::orchestration::workflow::WorkflowBudget>,
613 },
614 WorkflowCompleted {
616 turn: u32,
617 #[serde(default, skip_serializing_if = "Vec::is_empty")]
618 completed: Vec<String>,
619 #[serde(default, skip_serializing_if = "Vec::is_empty")]
620 failed: Vec<String>,
621 },
622 AgentPreempted {
628 turn: u32,
629 #[serde(default, skip_serializing_if = "Vec::is_empty")]
630 agent_ids: Vec<String>,
631 reason: String,
632 },
633 ToolGated {
636 turn: u32,
637 call_id: String,
638 tool: String,
639 reason: String,
640 },
641 SignalDisposed {
643 turn: u32,
644 signal_id: String,
645 disposition: String,
646 queue_depth: u32,
647 },
648 BudgetExceeded { turn: u32, budget: String },
650 Suspended {
652 turn: u32,
653 reason: String,
654 #[serde(default, skip_serializing_if = "Vec::is_empty")]
655 pending_calls: Vec<String>,
656 },
657 Resumed {
659 turn: u32,
660 #[serde(default, skip_serializing_if = "Vec::is_empty")]
661 approved: Vec<String>,
662 #[serde(default, skip_serializing_if = "Vec::is_empty")]
663 denied: Vec<String>,
664 },
665 PageOut {
667 turn: u32,
668 action: KernelPressureAction,
669 rho_after: f64,
670 #[serde(default, skip_serializing_if = "Option::is_none")]
671 summary: Option<String>,
672 #[serde(default, skip_serializing_if = "Vec::is_empty")]
673 archived: Vec<Message>,
674 tier_hint: String,
675 },
676 PageInRequested {
678 turn: u32,
679 call_id: String,
680 tool: String,
681 query: String,
682 top_k: u32,
683 },
684 MemoryWritten {
686 turn: u32,
687 memory_id: String,
688 memory_kind: String,
689 size_bytes: u32,
690 },
691 MemoryValidationFailed {
693 turn: u32,
694 memory_id: String,
695 error: String,
696 },
697 MemoryQueried {
699 turn: u32,
700 query_context: String,
701 requested_k: usize,
702 requires_async_response: bool,
703 },
704 LargeResultSpooled {
706 turn: u32,
707 call_id: String,
708 tool: String,
709 original_size: u32,
710 preview_size: u32,
711 spool_ref: Option<String>,
712 },
713}
714
715#[derive(Debug, Clone, Serialize, Deserialize)]
721#[serde(tag = "kind", rename_all = "snake_case")]
722pub enum TransactionObservation {
723 CheckpointTaken { turn: u32, history_len: u32 },
724 Rollbacked {
725 turn: u32,
726 checkpoint_history_len: u32,
727 #[serde(default, skip_serializing_if = "Option::is_none")]
728 reason: Option<crate::runtime::session::RollbackReason>,
729 },
730}
731
732#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
733#[serde(rename_all = "snake_case")]
734pub enum KernelPressureAction {
735 None,
736 SnipCompact,
737 MicroCompact,
738 ContextCollapse,
739 AutoCompact,
740}
741
742impl From<PressureAction> for KernelPressureAction {
743 fn from(action: PressureAction) -> Self {
744 match action {
745 PressureAction::None => Self::None,
746 PressureAction::SnipCompact => Self::SnipCompact,
747 PressureAction::MicroCompact => Self::MicroCompact,
748 PressureAction::ContextCollapse => Self::ContextCollapse,
749 PressureAction::AutoCompact => Self::AutoCompact,
750 }
751 }
752}
753
754pub struct KernelRuntime {
757 sm: LoopStateMachine,
758}
759
760impl KernelRuntime {
761 pub fn new(policy: LoopPolicy) -> Self {
762 Self {
763 sm: LoopStateMachine::new(policy),
764 }
765 }
766
767 pub fn state_machine(&self) -> &LoopStateMachine {
768 &self.sm
769 }
770
771 pub fn state_machine_mut(&mut self) -> &mut LoopStateMachine {
772 &mut self.sm
773 }
774
775 pub fn is_terminal(&self) -> bool {
776 self.sm.is_terminal()
777 }
778
779 pub fn local_subagents_spawned(&self) -> u32 {
782 self.sm.local_subagents_spawned()
783 }
784
785 pub fn step(&mut self, input: KernelInput) -> KernelStep {
786 let action = match input.event {
787 KernelInputEvent::SetTools { tools } => {
788 self.sm.tools = tools;
789 return KernelStep::empty(self.sm.take_observations());
790 }
791 KernelInputEvent::SetAvailableSkills { skills } => {
792 self.sm.ctx.set_available_skills(skills);
793 return KernelStep::empty(self.sm.take_observations());
794 }
795 KernelInputEvent::SkillActivated { name } => {
796 self.sm.ctx.activate_skill(name);
799 return KernelStep::empty(self.sm.take_observations());
800 }
801 KernelInputEvent::SetStableCoreTools { tool_ids } => {
802 self.sm.ctx.set_stable_core_tools(tool_ids.into_iter().map(Into::into));
803 return KernelStep::empty(self.sm.take_observations());
804 }
805 KernelInputEvent::SetMemoryEnabled { enabled } => {
806 self.sm.ctx.set_memory_enabled(enabled);
807 return KernelStep::empty(self.sm.take_observations());
808 }
809 KernelInputEvent::SetKnowledgeEnabled { enabled } => {
810 self.sm.ctx.set_knowledge_enabled(enabled);
811 return KernelStep::empty(self.sm.take_observations());
812 }
813 KernelInputEvent::SetPlanToolEnabled { enabled } => {
814 self.sm.ctx.set_plan_tool_enabled(enabled);
815 return KernelStep::empty(self.sm.take_observations());
816 }
817 KernelInputEvent::SetTokenizer { .. } => {
818 self.sm.ctx.engine = ContextTokenEngine::char_approx();
822 return KernelStep::empty(self.sm.take_observations());
823 }
824 KernelInputEvent::AddSystemMessage { content, tokens } => {
825 self.sm
826 .ctx
827 .partitions
828 .system
829 .push(Message::system(content), tokens.max(1));
830 return KernelStep::empty(self.sm.take_observations());
831 }
832 KernelInputEvent::AddKnowledgeMessage { content, tokens } => {
833 self.sm.ctx.partitions.knowledge.push(Message::system(content), tokens.max(1));
842 return KernelStep::empty(self.sm.take_observations());
843 }
844 KernelInputEvent::AddHistoryMessage { message, tokens } => {
845 let tokens = tokens.unwrap_or_else(|| self.sm.ctx.engine.count_message(&message));
846 self.sm.ctx.push_history(message, tokens.max(1));
847 return KernelStep::empty(self.sm.take_observations());
848 }
849 KernelInputEvent::PreloadHistory { messages } => {
850 self.sm.preload_history(messages);
851 return KernelStep::empty(self.sm.take_observations());
852 }
853 KernelInputEvent::MountCapability { capability } => {
854 self.sm.mount_capability(capability, None, None);
855 return KernelStep::empty(self.sm.take_observations());
856 }
857 KernelInputEvent::UnmountCapability {
858 capability_kind,
859 id,
860 } => {
861 self.sm.unmount_capability(capability_kind, &id);
862 return KernelStep::empty(self.sm.take_observations());
863 }
864 KernelInputEvent::LoadMilestoneContract { contract } => {
865 self.sm.load_milestone_contract(contract);
866 return KernelStep::empty(self.sm.take_observations());
867 }
868 KernelInputEvent::LoadGovernancePolicy {
869 default_action,
870 rules,
871 vetoed_tools,
872 rate_limits,
873 constraints,
874 } => {
875 self.sm.set_governance(build_governance_pipeline(
876 default_action,
877 rules,
878 vetoed_tools,
879 rate_limits,
880 constraints,
881 ));
882 return KernelStep::empty(self.sm.take_observations());
883 }
884 KernelInputEvent::ConfigureRun { config } => {
885 let RunConfig {
892 tools,
893 available_skills,
894 stable_core_tools,
895 memory_enabled,
896 knowledge_enabled,
897 plan_tool_enabled,
898 tokenizer,
899 governance,
900 attention_max_queue_size,
901 scheduler_max_wall_ms,
902 resource_quota,
903 group_tokens_base,
904 group_spawns_base,
905 } = config;
906 if let Some(tools) = tools {
907 self.sm.tools = tools;
908 }
909 if let Some(skills) = available_skills {
910 self.sm.ctx.set_available_skills(skills);
911 }
912 if let Some(ids) = stable_core_tools {
913 self.sm.ctx.set_stable_core_tools(ids.into_iter().map(Into::into));
914 }
915 if let Some(enabled) = memory_enabled {
916 self.sm.ctx.set_memory_enabled(enabled);
917 }
918 if let Some(enabled) = knowledge_enabled {
919 self.sm.ctx.set_knowledge_enabled(enabled);
920 }
921 if let Some(enabled) = plan_tool_enabled {
922 self.sm.ctx.set_plan_tool_enabled(enabled);
923 }
924 if tokenizer.is_some() {
925 self.sm.ctx.engine = ContextTokenEngine::char_approx();
926 }
927 if let Some(g) = governance {
928 self.sm.set_governance(build_governance_pipeline(
929 g.default_action,
930 g.rules,
931 g.vetoed_tools,
932 g.rate_limits,
933 g.constraints,
934 ));
935 }
936 if let Some(max_queue) = attention_max_queue_size {
937 self.sm.set_attention(max_queue as usize);
938 }
939 if let Some(ms) = scheduler_max_wall_ms {
940 self.sm.set_wall_budget(Some(ms));
941 }
942 if let Some(quota) = resource_quota {
943 self.sm.set_resource_quota(quota);
944 }
945 if let Some(base) = group_tokens_base {
946 self.sm.seed_group_budget(base);
947 }
948 if let Some(base) = group_spawns_base {
949 self.sm.seed_group_spawns(base);
950 }
951 return KernelStep::empty(self.sm.take_observations());
952 }
953 KernelInputEvent::SetAttentionPolicy { max_queue_size } => {
954 self.sm.set_attention(max_queue_size as usize);
955 return KernelStep::empty(self.sm.take_observations());
956 }
957 KernelInputEvent::PageIn { entries } => {
958 self.sm.apply_page_in(&entries);
959 return KernelStep::empty(self.sm.take_observations());
960 }
961 KernelInputEvent::ForceCompact => {
962 self.sm.force_compact();
963 return KernelStep::empty(self.sm.take_observations());
964 }
965 KernelInputEvent::UpdateTask { update } => {
966 self.sm.ctx.update_task(update);
967 return KernelStep::empty(self.sm.take_observations());
968 }
969 KernelInputEvent::StartRun { task, run_spec } => {
970 self.sm.run_spec = run_spec;
971 self.sm.start(task)
972 }
973 KernelInputEvent::CapabilityCommand { command } => {
974 self.sm.execute_capability_command(command);
975 return KernelStep::empty(self.sm.take_observations());
976 }
977 KernelInputEvent::Resume { approved_calls, denied_calls } => {
978 let action = self.sm.resume_from_suspend(approved_calls, denied_calls);
979 if matches!(action, LoopAction::AwaitingResume) {
980 return KernelStep::empty(self.sm.take_observations());
981 }
982 return KernelStep::single(action, self.sm.take_observations());
983 }
984 KernelInputEvent::SetSchedulerBudget { max_wall_ms } => {
985 self.sm.set_wall_budget(max_wall_ms);
986 return KernelStep::empty(self.sm.take_observations());
987 }
988 KernelInputEvent::SetResourceQuota { quota } => {
989 self.sm.set_resource_quota(quota);
990 return KernelStep::empty(self.sm.take_observations());
991 }
992 KernelInputEvent::ProviderResult {
993 message,
994 observed_input_tokens,
995 observed_output_tokens: _,
996 now_ms,
997 } => {
998 if let Some(tokens) = observed_input_tokens {
999 self.sm.ctx.set_observed_prompt_tokens(tokens);
1000 }
1001 if let Some(ms) = now_ms {
1004 self.sm.set_observed_time(ms);
1005 }
1006 self.sm.feed(LoopEvent::LLMResponse { message })
1007 }
1008 KernelInputEvent::ToolResults { results } => {
1009 self.sm.feed(LoopEvent::ToolResults { results })
1010 }
1011 KernelInputEvent::Signal { signal } => match self.sm.signal_event(signal) {
1012 Some(action) => action,
1013 None => return KernelStep::empty(self.sm.take_observations()),
1016 },
1017 KernelInputEvent::MilestoneResult { result } => {
1018 self.sm.feed(LoopEvent::MilestoneResult { result })
1019 }
1020 KernelInputEvent::SpawnSubAgent {
1021 spec,
1022 parent_session_id,
1023 } => {
1024 let action = self.sm.spawn_sub_agent(spec, &parent_session_id);
1025 if matches!(action, LoopAction::AwaitingResume) {
1026 return KernelStep::empty(self.sm.take_observations());
1027 }
1028 return KernelStep::single(action, self.sm.take_observations());
1029 }
1030 KernelInputEvent::LoadWorkflow {
1031 spec,
1032 parent_session_id,
1033 resumed_completed,
1034 resumed_submissions,
1035 } => {
1036 self.sm.ensure_started_for_workflow(&spec);
1040 let action = if resumed_completed.is_empty() && resumed_submissions.is_empty() {
1041 self.sm.load_workflow(spec, &parent_session_id)
1042 } else {
1043 self.sm.load_workflow_resumed(
1044 spec,
1045 &parent_session_id,
1046 &resumed_submissions,
1047 &resumed_completed,
1048 )
1049 };
1050 if matches!(action, LoopAction::AwaitingResume) {
1051 return KernelStep::empty(self.sm.take_observations());
1052 }
1053 return KernelStep::single(action, self.sm.take_observations());
1054 }
1055 KernelInputEvent::SubAgentCompleted { result } => {
1056 self.sm.feed(LoopEvent::SubAgentCompleted { result })
1057 }
1058 KernelInputEvent::SubmitWorkflowNodes {
1059 nodes,
1060 submitter_agent_id,
1061 } => {
1062 let action = self
1063 .sm
1064 .submit_workflow_nodes(nodes, submitter_agent_id.as_deref());
1065 if matches!(action, LoopAction::AwaitingResume) {
1066 return KernelStep::empty(self.sm.take_observations());
1067 }
1068 return KernelStep::single(action, self.sm.take_observations());
1069 }
1070 KernelInputEvent::SubmitWorkflow {
1071 spec,
1072 parent_session_id,
1073 submitter_agent_id,
1074 } => {
1075 let action = self.sm.submit_workflow(
1076 spec,
1077 &parent_session_id,
1078 submitter_agent_id.as_deref(),
1079 );
1080 if matches!(action, LoopAction::AwaitingResume) {
1081 return KernelStep::empty(self.sm.take_observations());
1082 }
1083 return KernelStep::single(action, self.sm.take_observations());
1084 }
1085 KernelInputEvent::SetMemoryPolicy {
1086 memory_path,
1087 stale_warning_days,
1088 retrieval_top_k,
1089 validation_enabled,
1090 max_content_bytes,
1091 max_name_length,
1092 } => {
1093 self.sm.set_memory_policy(crate::mm::memory::MemoryPolicy {
1097 memory_path,
1098 stale_warning_days,
1099 retrieval_top_k,
1100 validation_enabled,
1101 max_content_bytes,
1102 max_name_length,
1103 });
1104 return KernelStep::empty(self.sm.take_observations());
1105 }
1106 KernelInputEvent::WriteMemory { memory } => {
1107 use crate::mm::memory::validate_memory_write;
1110 let turn = self.sm.turn;
1111 let disposition = self
1115 .sm
1116 .gate_syscall(&crate::syscall::Syscall::WriteMemory(memory.clone()));
1117 if !disposition.is_allowed() {
1118 let error = match disposition {
1119 crate::syscall::Disposition::RateLimited { retry_after_ms } => {
1120 format!("memory write rate limited; retry after {retry_after_ms}ms")
1121 }
1122 crate::syscall::Disposition::Deny { reason, .. } => {
1123 format!("memory write denied: {reason}")
1124 }
1125 _ => "memory write not permitted".to_string(),
1126 };
1127 self.sm.observations.push(
1128 KernelObservation::MemoryValidationFailed {
1129 turn,
1130 memory_id: memory.metadata.name.clone(),
1131 error,
1132 },
1133 );
1134 return KernelStep::empty(self.sm.take_observations());
1135 }
1136 let validation_result = match self.sm.memory_policy() {
1140 Some(p) if !p.validation_enabled => Ok(()),
1141 Some(p) => p.validation().validate(&memory),
1142 None => validate_memory_write(&memory),
1143 };
1144 match validation_result {
1145 Ok(()) => {
1146 self.sm.observations.push(KernelObservation::MemoryWritten {
1148 turn,
1149 memory_id: memory.metadata.name.clone(),
1150 memory_kind: memory.metadata.kind.map(|k| k.label()).unwrap_or_else(|| {
1151 crate::mm::memory::MemoryKind::infer_from_metadata(&memory.metadata).label()
1152 }).to_string(),
1153 size_bytes: memory.content.len() as u32,
1154 });
1155 }
1156 Err(err) => {
1157 use crate::mm::memory::MemoryValidationError;
1159 let error_msg = match err {
1160 MemoryValidationError::MissingRequiredField { field } => format!("Missing required field: {}", field),
1161 MemoryValidationError::ContentTooLarge { size, limit } => format!("Content too large: {} bytes (limit: {})", size, limit),
1162 MemoryValidationError::ForbiddenPattern { pattern, reason } => format!("Forbidden pattern '{}': {}", pattern, reason),
1163 MemoryValidationError::InvalidKind { kind } => format!("Invalid kind: {}", kind),
1164 MemoryValidationError::NameTooLong { length, limit } => format!("Name too long: {} chars (limit: {})", length, limit),
1165 };
1166 self.sm.observations.push(KernelObservation::MemoryValidationFailed {
1167 turn,
1168 memory_id: memory.metadata.name.clone(),
1169 error: error_msg,
1170 });
1171 }
1172 }
1173 return KernelStep::empty(self.sm.take_observations());
1174 }
1175 KernelInputEvent::QueryMemory { query } => {
1176 let turn = self.sm.turn;
1179 let requested_k = match self.sm.memory_policy() {
1181 Some(p) => p.clamp_top_k(query.top_k),
1182 None => query.top_k,
1183 };
1184 self.sm.observations.push(KernelObservation::MemoryQueried {
1185 turn,
1186 query_context: query.current_context.clone(),
1187 requested_k,
1188 requires_async_response: true,
1189 });
1190 return KernelStep::empty(self.sm.take_observations());
1191 }
1192 KernelInputEvent::MemoryRetrievalResult { .. } => {
1193 return KernelStep::empty(self.sm.take_observations());
1195 }
1196 KernelInputEvent::Timeout => self.sm.feed(LoopEvent::Timeout),
1197 };
1198 if matches!(action, LoopAction::AwaitingResume) {
1199 return KernelStep::empty(self.sm.take_observations());
1200 }
1201 KernelStep::single(action, self.sm.take_observations())
1202 }
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207 use super::*;
1208
1209 #[test]
1210 fn start_run_returns_versioned_provider_action() {
1211 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1212 let step = runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1213 task: RuntimeTask::new("ship it"),
1214 run_spec: None,
1215 }));
1216
1217 assert_eq!(step.version, KERNEL_ABI_VERSION);
1218 assert!(matches!(
1219 step.actions.as_slice(),
1220 [KernelAction::CallProvider { .. }]
1221 ));
1222 }
1223
1224 #[test]
1225 fn provider_text_response_returns_done() {
1226 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1227 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1228 task: RuntimeTask::new("ship it"),
1229 run_spec: None,
1230 }));
1231 let step = runtime.step(KernelInput::new(KernelInputEvent::ProviderResult {
1232 message: Message::assistant("done"),
1233 observed_input_tokens: None,
1234 observed_output_tokens: None,
1235 now_ms: None,
1236 }));
1237
1238 assert!(matches!(
1239 step.actions.as_slice(),
1240 [KernelAction::Done { .. }]
1241 ));
1242 }
1243
1244 #[test]
1245 fn config_inputs_mutate_runtime_without_actions() {
1246 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1247 let step = runtime.step(KernelInput::new(KernelInputEvent::SetTools {
1248 tools: vec![ToolSchema {
1249 name: "echo".into(),
1250 description: "Echo input".to_string(),
1251 parameters: serde_json::json!({"type": "object"}),
1252 }],
1253 }));
1254
1255 assert!(step.actions.is_empty());
1256 assert_eq!(runtime.state_machine().tools.len(), 1);
1257 }
1258
1259 #[test]
1260 fn skill_activated_input_records_active_skill() {
1261 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1264 let mut debug = SkillMetadata::new("debug", "Debug helper");
1265 debug.allowed_tools = vec!["read".into(), "grep".into()];
1266 runtime.step(KernelInput::new(KernelInputEvent::SetAvailableSkills {
1267 skills: vec![debug],
1268 }));
1269
1270 let step = runtime.step(KernelInput::new(KernelInputEvent::SkillActivated {
1271 name: "debug".to_string(),
1272 }));
1273
1274 assert!(step.actions.is_empty(), "activation is config, not an action");
1275 assert!(runtime.state_machine().ctx.active_skills.contains("debug"));
1276 let filter = runtime.state_machine().ctx.active_skill_tool_filter().unwrap();
1277 assert_eq!(filter.len(), 2);
1278 }
1279
1280 #[test]
1281 fn update_task_input_mutates_task_state() {
1282 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1283 let step = runtime.step(KernelInput::new(KernelInputEvent::UpdateTask {
1284 update: TaskUpdate {
1285 progress: Some("tools executed".to_string()),
1286 ..Default::default()
1287 },
1288 }));
1289
1290 assert!(step.actions.is_empty());
1291 assert_eq!(
1292 runtime.state_machine().ctx.partitions.task_state.progress,
1293 "tools executed"
1294 );
1295 }
1296
1297 #[test]
1298 fn add_knowledge_message_enters_knowledge_partition() {
1299 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1300 let step = runtime.step(KernelInput::new(KernelInputEvent::AddKnowledgeMessage {
1301 content: "skill: debug".to_string(),
1302 tokens: 10,
1303 }));
1304
1305 assert!(step.actions.is_empty());
1306 assert_eq!(
1307 runtime.state_machine().ctx.partitions.knowledge.messages.len(),
1308 1
1309 );
1310 }
1311
1312 #[test]
1313 fn capability_mount_emits_observation() {
1314 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1315 let step = runtime.step(KernelInput::new(KernelInputEvent::MountCapability {
1316 capability: CapabilityDescriptor::marker(
1317 CapabilityKind::McpServer,
1318 "docs",
1319 "Documentation server",
1320 ),
1321 }));
1322
1323 assert!(step.actions.is_empty());
1324 assert!(matches!(
1325 step.observations.as_slice(),
1326 [KernelObservation::CapabilityChanged { .. }]
1327 ));
1328 }
1329
1330 #[test]
1331 fn spawn_sub_agent_input_registers_process() {
1332 use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
1333
1334 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1335 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1336 task: RuntimeTask::new("parent task"),
1337 run_spec: None,
1338 }));
1339 runtime.state_machine_mut().take_observations();
1340
1341 let spec = AgentRunSpec::new(
1342 AgentIdentity::sub_agent("worker", "worker-session"),
1343 AgentRole::Implement,
1344 "do work",
1345 );
1346 let step = runtime.step(KernelInput::new(KernelInputEvent::SpawnSubAgent {
1347 spec,
1348 parent_session_id: "parent-session".to_string(),
1349 }));
1350
1351 assert!(step.actions.is_empty());
1352 assert!(step.observations.iter().any(|o| matches!(
1353 o,
1354 KernelObservation::AgentProcessChanged {
1355 agent_id,
1356 parent_session_id,
1357 state,
1358 ..
1359 } if agent_id == "worker" && parent_session_id == "parent-session" && state == "running"
1360 )));
1361 assert_eq!(
1362 runtime
1363 .state_machine()
1364 .agent_process("worker")
1365 .expect("process")
1366 .parent_session_id
1367 .as_str(),
1368 "parent-session"
1369 );
1370 assert!(step.observations.iter().any(|o| matches!(
1371 o,
1372 KernelObservation::Suspended { reason, .. } if reason == "sub_agent_await"
1373 )));
1374 assert!(runtime.state_machine().is_suspended());
1375 assert!(matches!(
1376 runtime.state_machine().wait_reason(),
1377 Some(crate::scheduler::tcb::WaitReason::SubAgentJoin(_))
1378 ));
1379 }
1380
1381 #[test]
1382 fn set_resource_quota_input_denies_spawn_over_quota() {
1383 use crate::governance::quota::ResourceQuota;
1384 use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
1385
1386 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1387 let step = runtime.step(KernelInput::new(KernelInputEvent::SetResourceQuota {
1389 quota: ResourceQuota { max_spawn_depth: Some(0), ..ResourceQuota::default() },
1390 }));
1391 assert!(step.actions.is_empty(), "config input yields no actions");
1392
1393 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1394 task: RuntimeTask::new("parent task"),
1395 run_spec: None,
1396 }));
1397 runtime.state_machine_mut().take_observations();
1398
1399 let spec = AgentRunSpec::new(
1400 AgentIdentity::sub_agent("worker", "worker-session"),
1401 AgentRole::Implement,
1402 "do work",
1403 );
1404 let step = runtime.step(KernelInput::new(KernelInputEvent::SpawnSubAgent {
1405 spec,
1406 parent_session_id: "parent-session".to_string(),
1407 }));
1408
1409 assert!(matches!(
1412 step.actions.as_slice(),
1413 [KernelAction::CallProvider { .. }]
1414 ));
1415 assert!(!step.observations.iter().any(|o| matches!(
1416 o,
1417 KernelObservation::AgentProcessChanged { agent_id, .. } if agent_id == "worker"
1418 )));
1419 assert!(runtime.state_machine().agent_process("worker").is_none());
1420 assert!(!runtime.state_machine().is_suspended());
1421 }
1422
1423 #[test]
1424 fn group_budget_base_enforces_shared_token_cap() {
1425 use crate::types::message::{Content, Message, ToolCall, ToolResult};
1426
1427 fn run_one_turn(group_base: Option<u64>) -> KernelStep {
1431 let mut runtime = KernelRuntime::new(LoopPolicy {
1432 max_total_tokens: 100,
1433 ..LoopPolicy::default()
1434 });
1435 runtime.step(KernelInput::new(KernelInputEvent::ConfigureRun {
1436 config: RunConfig { group_tokens_base: group_base, ..RunConfig::default() },
1437 }));
1438 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1439 task: RuntimeTask::new("task"),
1440 run_spec: None,
1441 }));
1442 let mut msg = Message::assistant("");
1443 msg.token_count = Some(10); msg.tool_calls.push(ToolCall {
1445 id: "c1".into(),
1446 name: "echo".into(),
1447 arguments: serde_json::json!({}),
1448 });
1449 runtime.step(KernelInput::new(KernelInputEvent::ProviderResult {
1450 message: msg,
1451 observed_input_tokens: None,
1452 observed_output_tokens: None,
1453 now_ms: None,
1454 }));
1455 runtime.step(KernelInput::new(KernelInputEvent::ToolResults {
1456 results: vec![ToolResult {
1457 call_id: "c1".into(),
1458 output: Content::Text("ok".into()),
1459 is_error: false,
1460 is_fatal: false,
1461 error_kind: None,
1462 token_count: None,
1463 }],
1464 }))
1465 }
1466
1467 let exceeded = |step: &KernelStep| {
1468 step.observations.iter().any(|o| {
1469 matches!(o, KernelObservation::BudgetExceeded { budget, .. } if budget == "token_budget")
1470 })
1471 };
1472
1473 assert!(
1475 exceeded(&run_one_turn(Some(95))),
1476 "group token budget must span the whole domain"
1477 );
1478 assert!(
1480 !exceeded(&run_one_turn(None)),
1481 "no group seed ⇒ per-vehicle budget, well under cap"
1482 );
1483 }
1484
1485 #[test]
1486 fn group_spawns_base_enforces_cumulative_spawn_cap() {
1487 use crate::governance::quota::ResourceQuota;
1488 use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
1489
1490 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1493 runtime.step(KernelInput::new(KernelInputEvent::ConfigureRun {
1494 config: RunConfig {
1495 resource_quota: Some(ResourceQuota {
1496 max_total_subagents: Some(2),
1497 ..ResourceQuota::default()
1498 }),
1499 group_spawns_base: Some(2),
1500 ..RunConfig::default()
1501 },
1502 }));
1503 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1504 task: RuntimeTask::new("task"),
1505 run_spec: None,
1506 }));
1507 runtime.state_machine_mut().take_observations();
1508
1509 let spec = AgentRunSpec::new(
1510 AgentIdentity::sub_agent("worker", "worker-session"),
1511 AgentRole::Implement,
1512 "do work",
1513 );
1514 let step = runtime.step(KernelInput::new(KernelInputEvent::SpawnSubAgent {
1515 spec,
1516 parent_session_id: "parent-session".to_string(),
1517 }));
1518
1519 assert!(matches!(
1521 step.actions.as_slice(),
1522 [KernelAction::CallProvider { .. }]
1523 ));
1524 assert!(runtime.state_machine().agent_process("worker").is_none());
1525 assert_eq!(runtime.local_subagents_spawned(), 0);
1526 }
1527
1528 #[test]
1529 fn default_runtime_leaves_spawn_unquota_ed() {
1530 use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
1531
1532 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1534 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1535 task: RuntimeTask::new("parent task"),
1536 run_spec: None,
1537 }));
1538 runtime.state_machine_mut().take_observations();
1539
1540 let spec = AgentRunSpec::new(
1541 AgentIdentity::sub_agent("worker", "worker-session"),
1542 AgentRole::Implement,
1543 "do work",
1544 );
1545 runtime.step(KernelInput::new(KernelInputEvent::SpawnSubAgent {
1546 spec,
1547 parent_session_id: "parent-session".to_string(),
1548 }));
1549 assert!(runtime.state_machine().agent_process("worker").is_some());
1550 assert!(runtime.state_machine().is_suspended());
1551 }
1552
1553 #[test]
1558 fn agent_process_changed_locks_multiword_wire_form() {
1559 use crate::types::agent::{AgentIdentity, AgentIsolation, AgentRole, AgentRunSpec};
1560
1561 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1562 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1563 task: RuntimeTask::new("parent task"),
1564 run_spec: None,
1565 }));
1566 runtime.state_machine_mut().take_observations();
1567
1568 let spec = AgentRunSpec::new(
1570 AgentIdentity::sub_agent("worker", "worker-session"),
1571 AgentRole::Verify,
1572 "do work",
1573 )
1574 .with_isolation(AgentIsolation::ReadOnly);
1575 let step = runtime.step(KernelInput::new(KernelInputEvent::SpawnSubAgent {
1576 spec,
1577 parent_session_id: "parent-session".to_string(),
1578 }));
1579
1580 let obs = step
1581 .observations
1582 .iter()
1583 .find(|o| matches!(o, KernelObservation::AgentProcessChanged { .. }))
1584 .expect("agent_process_changed observation");
1585 let json = serde_json::to_value(obs).unwrap();
1586 assert_eq!(json["isolation"], "readonly", "isolation must stay debug-lowercase");
1587 assert_eq!(
1588 json["context_inheritance"], "systemonly",
1589 "context_inheritance must stay debug-lowercase"
1590 );
1591 assert_eq!(json["role"], "verify");
1592 assert_eq!(json["state"], "running");
1593 }
1594
1595 fn write_memory(runtime: &mut KernelRuntime, name: &str, content: &str) -> KernelStep {
1598 use crate::mm::memory::{MemoryMetadata, MemoryWriteRequest};
1599 runtime.step(KernelInput::new(KernelInputEvent::WriteMemory {
1600 memory: MemoryWriteRequest {
1601 metadata: MemoryMetadata {
1602 name: name.to_string(),
1603 description: "desc".to_string(),
1604 ..Default::default()
1605 },
1606 content: content.to_string(),
1607 },
1608 }))
1609 }
1610
1611 #[test]
1612 fn memory_policy_validation_disabled_admits_forbidden_write() {
1613 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1615 runtime.step(KernelInput::new(KernelInputEvent::SetMemoryPolicy {
1616 memory_path: String::new(),
1617 stale_warning_days: 2,
1618 retrieval_top_k: 5,
1619 validation_enabled: false,
1620 max_content_bytes: None,
1621 max_name_length: None,
1622 }));
1623 let step = write_memory(&mut runtime, "note", "代码模式: foo");
1624 assert!(step
1625 .observations
1626 .iter()
1627 .any(|o| matches!(o, KernelObservation::MemoryWritten { .. })));
1628 assert!(!step
1629 .observations
1630 .iter()
1631 .any(|o| matches!(o, KernelObservation::MemoryValidationFailed { .. })));
1632 }
1633
1634 #[test]
1635 fn default_runtime_validates_forbidden_write() {
1636 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1638 let step = write_memory(&mut runtime, "note", "代码模式: foo");
1639 assert!(step
1640 .observations
1641 .iter()
1642 .any(|o| matches!(o, KernelObservation::MemoryValidationFailed { .. })));
1643 }
1644
1645 #[test]
1646 fn memory_policy_size_override_rejects_oversized_write() {
1647 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1648 runtime.step(KernelInput::new(KernelInputEvent::SetMemoryPolicy {
1649 memory_path: String::new(),
1650 stale_warning_days: 2,
1651 retrieval_top_k: 5,
1652 validation_enabled: true,
1653 max_content_bytes: Some(8),
1654 max_name_length: None,
1655 }));
1656 let step = write_memory(&mut runtime, "note", "this content is well over eight bytes");
1657 let failed = step.observations.iter().find_map(|o| match o {
1658 KernelObservation::MemoryValidationFailed { error, .. } => Some(error.clone()),
1659 _ => None,
1660 });
1661 assert!(failed.is_some_and(|e| e.contains("too large")));
1662 }
1663
1664 #[test]
1665 fn memory_policy_clamps_retrieval_top_k() {
1666 use crate::mm::memory::MemoryQuery;
1667 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1668 runtime.step(KernelInput::new(KernelInputEvent::SetMemoryPolicy {
1669 memory_path: String::new(),
1670 stale_warning_days: 2,
1671 retrieval_top_k: 3,
1672 validation_enabled: true,
1673 max_content_bytes: None,
1674 max_name_length: None,
1675 }));
1676 let step = runtime.step(KernelInput::new(KernelInputEvent::QueryMemory {
1677 query: MemoryQuery { top_k: 50, ..Default::default() },
1678 }));
1679 let requested = step.observations.iter().find_map(|o| match o {
1680 KernelObservation::MemoryQueried { requested_k, .. } => Some(*requested_k),
1681 _ => None,
1682 });
1683 assert_eq!(requested, Some(3));
1684 }
1685
1686 #[test]
1687 fn default_runtime_uses_requested_top_k_verbatim() {
1688 use crate::mm::memory::MemoryQuery;
1689 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1690 let step = runtime.step(KernelInput::new(KernelInputEvent::QueryMemory {
1691 query: MemoryQuery { top_k: 50, ..Default::default() },
1692 }));
1693 let requested = step.observations.iter().find_map(|o| match o {
1694 KernelObservation::MemoryQueried { requested_k, .. } => Some(*requested_k),
1695 _ => None,
1696 });
1697 assert_eq!(requested, Some(50));
1698 }
1699
1700 #[test]
1701 fn provider_result_now_ms_drives_wall_time_budget() {
1702 let mut runtime = KernelRuntime::new(LoopPolicy {
1703 max_wall_ms: Some(10),
1704 ..LoopPolicy::default()
1705 });
1706 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1707 task: RuntimeTask::new("ship it"),
1708 run_spec: None,
1709 }));
1710 let mut msg = Message::assistant("");
1711 msg.tool_calls.push(ToolCall {
1712 id: "call-1".into(),
1713 name: "echo".into(),
1714 arguments: serde_json::json!({}),
1715 });
1716 runtime.step(KernelInput::new(KernelInputEvent::ProviderResult {
1717 message: msg,
1718 observed_input_tokens: None,
1719 observed_output_tokens: None,
1720 now_ms: Some(100),
1721 }));
1722 let step = runtime.step(KernelInput::new(KernelInputEvent::ToolResults {
1723 results: vec![ToolResult {
1724 call_id: "call-1".into(),
1725 output: crate::types::message::Content::Text("ok".into()),
1726 is_error: false,
1727 is_fatal: false,
1728 error_kind: None,
1729 token_count: None,
1730 }],
1731 }));
1732
1733 assert!(matches!(
1734 step.actions.as_slice(),
1735 [KernelAction::CallProvider { tools, .. }] if tools.is_empty()
1736 ));
1737 }
1738
1739 fn assistant_calling(tool: &str) -> Message {
1742 let mut msg = Message::assistant("");
1743 msg.tool_calls.push(ToolCall {
1744 id: "call-1".into(),
1745 name: tool.into(),
1746 arguments: serde_json::json!({}),
1747 });
1748 msg
1749 }
1750
1751 fn run_with_tool_call(runtime: &mut KernelRuntime, tool: &str) -> KernelStep {
1753 run_with_tool_call_named(runtime, tool, "call-1")
1754 }
1755
1756 fn run_with_tool_call_named(
1757 runtime: &mut KernelRuntime,
1758 tool: &str,
1759 call_id: &str,
1760 ) -> KernelStep {
1761 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1762 task: RuntimeTask::new("do the thing"),
1763 run_spec: None,
1764 }));
1765 runtime.state_machine_mut().take_observations();
1766 runtime.step(KernelInput::new(KernelInputEvent::ProviderResult {
1767 message: assistant_calling(tool),
1768 observed_input_tokens: None,
1769 observed_output_tokens: None,
1770 now_ms: None,
1771 }))
1772 }
1773
1774 #[test]
1775 fn governance_deny_blocks_tool_and_reprompts() {
1776 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1777 runtime.step(KernelInput::new(KernelInputEvent::LoadGovernancePolicy {
1778 default_action: Some(PolicyAction::Allow),
1779 rules: vec![PolicyRule {
1780 tool_pattern: "danger.*".to_string(),
1781 action: PolicyAction::Deny,
1782 }],
1783 vetoed_tools: vec![],
1784 rate_limits: vec![],
1785 constraints: vec![],
1786 }));
1787
1788 let step = run_with_tool_call(&mut runtime, "danger.delete");
1789
1790 assert!(
1792 matches!(step.actions.as_slice(), [KernelAction::CallProvider { .. }]),
1793 "denied tool should roll back and re-call provider, got {:?}",
1794 step.actions
1795 );
1796 assert!(
1797 step.observations
1798 .iter()
1799 .any(|o| matches!(o, KernelObservation::Rollbacked { .. })),
1800 "expected a Rollbacked observation for the denied turn",
1801 );
1802 }
1803
1804 #[test]
1805 fn configure_run_bundle_applies_governance_equivalently_to_load_governance_policy() {
1806 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1809 runtime.step(KernelInput::new(KernelInputEvent::ConfigureRun {
1810 config: RunConfig {
1811 tools: Some(vec![]),
1812 governance: Some(GovernanceConfig {
1813 default_action: Some(PolicyAction::Allow),
1814 rules: vec![PolicyRule {
1815 tool_pattern: "danger.*".to_string(),
1816 action: PolicyAction::Deny,
1817 }],
1818 ..GovernanceConfig::default()
1819 }),
1820 attention_max_queue_size: Some(32),
1821 ..RunConfig::default()
1822 },
1823 }));
1824
1825 let step = run_with_tool_call(&mut runtime, "danger.delete");
1826
1827 assert!(
1828 matches!(step.actions.as_slice(), [KernelAction::CallProvider { .. }]),
1829 "bundle-configured deny should roll back and re-call provider, got {:?}",
1830 step.actions
1831 );
1832 assert!(
1833 step.observations
1834 .iter()
1835 .any(|o| matches!(o, KernelObservation::Rollbacked { .. })),
1836 "expected a Rollbacked observation for the bundle-denied turn",
1837 );
1838 }
1839
1840 #[test]
1841 fn configure_run_round_trips_over_the_abi() {
1842 let event = KernelInputEvent::ConfigureRun {
1845 config: RunConfig {
1846 resource_quota: Some(crate::governance::quota::ResourceQuota {
1847 max_concurrent_subagents: Some(2),
1848 ..Default::default()
1849 }),
1850 scheduler_max_wall_ms: Some(60_000),
1851 plan_tool_enabled: Some(true),
1852 ..RunConfig::default()
1853 },
1854 };
1855 let json = serde_json::to_string(&event).expect("serialize");
1856 let parsed: KernelInputEvent = serde_json::from_str(&json).expect("deserialize");
1857 assert!(matches!(parsed, KernelInputEvent::ConfigureRun { .. }));
1858 }
1859
1860 #[test]
1861 fn governance_ask_user_suspends_until_resume() {
1862 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1863 runtime.step(KernelInput::new(KernelInputEvent::LoadGovernancePolicy {
1864 default_action: Some(PolicyAction::Allow),
1865 rules: vec![PolicyRule {
1866 tool_pattern: "sensitive.*".to_string(),
1867 action: PolicyAction::AskUser,
1868 }],
1869 vetoed_tools: vec![],
1870 rate_limits: vec![],
1871 constraints: vec![],
1872 }));
1873
1874 let step = run_with_tool_call(&mut runtime, "sensitive.read");
1875
1876 assert!(
1877 step.actions.is_empty(),
1878 "AskUser should suspend without ExecuteTool, got {:?}",
1879 step.actions
1880 );
1881 assert!(
1882 step.observations.iter().any(|o| matches!(
1883 o,
1884 KernelObservation::ToolGated { tool, .. } if tool == "sensitive.read"
1885 )),
1886 "expected a ToolGated observation for the AskUser call",
1887 );
1888 assert!(
1889 step.observations.iter().any(|o| matches!(
1890 o,
1891 KernelObservation::Suspended { reason, .. } if reason == "ask_user"
1892 )),
1893 "expected a Suspended observation",
1894 );
1895
1896 let resumed = runtime.step(KernelInput::new(KernelInputEvent::Resume {
1897 approved_calls: vec!["call-1".to_string()],
1898 denied_calls: vec![],
1899 }));
1900 assert!(
1901 matches!(resumed.actions.as_slice(), [KernelAction::ExecuteTool { .. }]),
1902 "resume with approval should emit ExecuteTool, got {:?}",
1903 resumed.actions
1904 );
1905 assert!(
1906 resumed.observations.iter().any(|o| matches!(
1907 o,
1908 KernelObservation::Resumed { approved, denied, .. }
1909 if approved == &["call-1"] && denied.is_empty()
1910 )),
1911 );
1912 }
1913
1914 #[test]
1915 fn governance_ask_user_resume_all_denied_feeds_tool_results() {
1916 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1917 runtime.step(KernelInput::new(KernelInputEvent::LoadGovernancePolicy {
1918 default_action: Some(PolicyAction::Allow),
1919 rules: vec![PolicyRule {
1920 tool_pattern: "sensitive.*".to_string(),
1921 action: PolicyAction::AskUser,
1922 }],
1923 vetoed_tools: vec![],
1924 rate_limits: vec![],
1925 constraints: vec![],
1926 }));
1927 run_with_tool_call(&mut runtime, "sensitive.read");
1928 runtime.state_machine_mut().take_observations();
1929
1930 let step = runtime.step(KernelInput::new(KernelInputEvent::Resume {
1931 approved_calls: vec![],
1932 denied_calls: vec!["call-1".to_string()],
1933 }));
1934 assert!(
1935 matches!(step.actions.as_slice(), [KernelAction::CallProvider { .. }]),
1936 "all denied should re-prompt provider, got {:?}",
1937 step.actions
1938 );
1939 }
1940
1941 #[test]
1942 fn no_governance_policy_executes_all_tools() {
1943 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1944 let step = run_with_tool_call(&mut runtime, "danger.delete");
1945
1946 assert!(matches!(
1948 step.actions.as_slice(),
1949 [KernelAction::ExecuteTool { .. }]
1950 ));
1951 assert!(
1952 !step
1953 .observations
1954 .iter()
1955 .any(|o| matches!(o, KernelObservation::ToolGated { .. })),
1956 );
1957 }
1958
1959 fn tool_ok(call_id: &str) -> ToolResult {
1960 ToolResult {
1961 call_id: call_id.into(),
1962 output: crate::types::message::Content::Text("ok".to_string()),
1963 is_error: false,
1964 is_fatal: false,
1965 error_kind: None,
1966 token_count: None,
1967 }
1968 }
1969
1970 #[test]
1971 fn governance_rate_limit_blocks_second_call() {
1972 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1973 runtime.step(KernelInput::new(KernelInputEvent::LoadGovernancePolicy {
1974 default_action: Some(PolicyAction::Allow),
1975 rules: vec![],
1976 vetoed_tools: vec![],
1977 rate_limits: vec![RateLimitSpec {
1978 tool: "fetch".to_string(),
1979 max_calls: 1,
1980 window_ms: 60_000,
1981 }],
1982 constraints: vec![],
1983 }));
1984 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1985 task: RuntimeTask::new("fetch twice"),
1986 run_spec: None,
1987 }));
1988 runtime.state_machine_mut().take_observations();
1989
1990 let s1 = runtime.step(KernelInput::new(KernelInputEvent::ProviderResult {
1992 message: assistant_calling("fetch"),
1993 observed_input_tokens: None,
1994 observed_output_tokens: None,
1995 now_ms: Some(1_000),
1996 }));
1997 assert!(
1998 matches!(s1.actions.as_slice(), [KernelAction::ExecuteTool { .. }]),
1999 "first call should execute, got {:?}",
2000 s1.actions
2001 );
2002
2003 runtime.step(KernelInput::new(KernelInputEvent::ToolResults {
2005 results: vec![tool_ok("call-1")],
2006 }));
2007 runtime.state_machine_mut().take_observations();
2008
2009 let s2 = runtime.step(KernelInput::new(KernelInputEvent::ProviderResult {
2011 message: assistant_calling("fetch"),
2012 observed_input_tokens: None,
2013 observed_output_tokens: None,
2014 now_ms: Some(1_001),
2015 }));
2016 assert!(
2017 matches!(s2.actions.as_slice(), [KernelAction::CallProvider { .. }]),
2018 "rate-limited call should roll back and re-call provider, got {:?}",
2019 s2.actions
2020 );
2021 assert!(
2022 s2.observations
2023 .iter()
2024 .any(|o| matches!(o, KernelObservation::Rollbacked { .. })),
2025 "expected a Rollbacked observation for the rate-limited turn",
2026 );
2027 }
2028
2029 #[test]
2030 fn governance_constraint_required_param_denies() {
2031 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2032 runtime.step(KernelInput::new(KernelInputEvent::LoadGovernancePolicy {
2033 default_action: Some(PolicyAction::Allow),
2034 rules: vec![],
2035 vetoed_tools: vec![],
2036 rate_limits: vec![],
2037 constraints: vec![ConstraintSpec::Required {
2038 tool: "write".to_string(),
2039 path: "path".to_string(),
2040 }],
2041 }));
2042
2043 let step = run_with_tool_call(&mut runtime, "write");
2045 assert!(
2046 matches!(step.actions.as_slice(), [KernelAction::CallProvider { .. }]),
2047 "missing required param should roll back, got {:?}",
2048 step.actions
2049 );
2050 assert!(
2051 step.observations
2052 .iter()
2053 .any(|o| matches!(o, KernelObservation::Rollbacked { .. })),
2054 "expected a Rollbacked observation for the constraint violation",
2055 );
2056 }
2057
2058 fn signal(urgency: crate::types::signal::Urgency, summary: &str) -> crate::types::signal::RuntimeSignal {
2061 use crate::types::signal::{RuntimeSignal, SignalSource, SignalType};
2062 RuntimeSignal::new(SignalSource::Gateway, SignalType::Alert, urgency, summary)
2063 }
2064
2065 fn started_runtime_with_attention(max_queue: u32) -> KernelRuntime {
2066 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2067 runtime.step(KernelInput::new(KernelInputEvent::SetAttentionPolicy {
2068 max_queue_size: max_queue,
2069 }));
2070 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
2071 task: RuntimeTask::new("watch for signals"),
2072 run_spec: None,
2073 }));
2074 runtime.state_machine_mut().take_observations();
2075 runtime
2076 }
2077
2078 #[test]
2079 fn attention_policy_critical_signal_interrupts() {
2080 use crate::types::signal::Urgency;
2081 let mut runtime = started_runtime_with_attention(8);
2082 let step = runtime.step(KernelInput::new(KernelInputEvent::Signal {
2083 signal: signal(Urgency::Critical, "fire"),
2084 }));
2085 assert!(
2086 matches!(step.actions.as_slice(), [KernelAction::CallProvider { .. }]),
2087 "critical signal should drive a provider call, got {:?}",
2088 step.actions
2089 );
2090 assert!(step.observations.iter().any(|o| matches!(
2091 o,
2092 KernelObservation::SignalDisposed { disposition, .. } if disposition == "interrupt_now"
2093 )));
2094 }
2095
2096 #[test]
2097 fn attention_policy_normal_signal_queues_without_action() {
2098 use crate::types::signal::Urgency;
2099 let mut runtime = started_runtime_with_attention(8);
2100 let step = runtime.step(KernelInput::new(KernelInputEvent::Signal {
2101 signal: signal(Urgency::Normal, "job"),
2102 }));
2103 assert!(
2104 step.actions.is_empty(),
2105 "normal signal should queue without a provider call, got {:?}",
2106 step.actions
2107 );
2108 assert!(step.observations.iter().any(|o| matches!(
2109 o,
2110 KernelObservation::SignalDisposed { disposition, queue_depth, .. }
2111 if disposition == "queue" && *queue_depth == 1
2112 )));
2113 }
2114
2115 #[test]
2116 fn attention_policy_full_queue_drops() {
2117 use crate::types::signal::Urgency;
2118 let mut runtime = started_runtime_with_attention(1);
2119 runtime.step(KernelInput::new(KernelInputEvent::Signal {
2120 signal: signal(Urgency::Normal, "first"),
2121 }));
2122 let step = runtime.step(KernelInput::new(KernelInputEvent::Signal {
2123 signal: signal(Urgency::Normal, "second"),
2124 }));
2125 assert!(step.observations.iter().any(|o| matches!(
2126 o,
2127 KernelObservation::SignalDisposed { disposition, .. } if disposition == "dropped"
2128 )));
2129 }
2130
2131 #[test]
2132 #[test]
2133 fn page_in_populates_knowledge_partition() {
2134 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2135 runtime.step(KernelInput::new(KernelInputEvent::SetMemoryEnabled {
2136 enabled: true,
2137 }));
2138 let before = runtime
2139 .state_machine()
2140 .ctx
2141 .partitions
2142 .knowledge
2143 .messages
2144 .len();
2145 runtime.step(KernelInput::new(KernelInputEvent::PageIn {
2146 entries: vec![crate::mm::PageInEntry {
2147 content: "[memory] prior fix".to_string(),
2148 tokens: Some(10),
2149 source: Some("memory".to_string()),
2150 }],
2151 }));
2152 let after = runtime
2153 .state_machine()
2154 .ctx
2155 .partitions
2156 .knowledge
2157 .messages
2158 .len();
2159 assert!(after > before, "page-in should add knowledge messages");
2160 }
2161
2162 #[test]
2163 fn memory_tool_emits_page_in_requested() {
2164 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2165 runtime.step(KernelInput::new(KernelInputEvent::SetMemoryEnabled {
2166 enabled: true,
2167 }));
2168 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
2169 task: RuntimeTask::new("test"),
2170 run_spec: None,
2171 }));
2172 runtime.state_machine_mut().take_observations();
2173
2174 let step = run_with_tool_call(&mut runtime, "memory");
2175 assert!(step.observations.iter().any(|o| matches!(
2176 o,
2177 KernelObservation::PageInRequested { tool, .. } if tool == "memory"
2178 )));
2179 }
2180
2181 #[test]
2182 fn load_workflow_input_drives_dag_to_completion() {
2183 use crate::orchestration::workflow::fanout_synthesize;
2184 use crate::types::result::{LoopResult, SubAgentResult, TerminationReason};
2185
2186 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2187 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
2188 task: RuntimeTask::new("parent task"),
2189 run_spec: None,
2190 }));
2191 runtime.state_machine_mut().take_observations();
2192
2193 let spec =
2195 fanout_synthesize(vec![RuntimeTask::new("w0"), RuntimeTask::new("w1")], RuntimeTask::new("synth"));
2196 let event = KernelInputEvent::LoadWorkflow {
2197 spec,
2198 parent_session_id: "sess".to_string(),
2199 resumed_completed: Vec::new(),
2200 resumed_submissions: Vec::new(),
2201 };
2202 let json = serde_json::to_string(&event).expect("serialize");
2203 let parsed: KernelInputEvent = serde_json::from_str(&json).expect("deserialize");
2204
2205 let step = runtime.step(KernelInput::new(parsed));
2206 let batch = step
2208 .observations
2209 .iter()
2210 .find_map(|o| match o {
2211 KernelObservation::WorkflowBatchSpawned { nodes, .. } => Some(nodes.clone()),
2212 _ => None,
2213 })
2214 .expect("workflow_batch_spawned");
2215 assert_eq!(batch.len(), 2);
2216 let goals: Vec<&str> = batch.iter().map(|n| n.goal.as_str()).collect();
2217 assert!(goals.contains(&"w0") && goals.contains(&"w1"));
2218 assert_eq!(batch[0].agent_id, "wf-node0");
2219 assert_eq!(batch[0].isolation, "read_only"); let complete = |runtime: &mut KernelRuntime, id: &str| {
2222 runtime.step(KernelInput::new(KernelInputEvent::SubAgentCompleted {
2223 result: SubAgentResult {
2224 agent_id: compact_str::CompactString::new(id),
2225 result: LoopResult {
2226 termination: TerminationReason::Completed,
2227 final_message: None,
2228 turns_used: 1,
2229 total_tokens_used: 1,
2230 loop_continue: None,
2231 classify_branch: None,
2232 tournament_winner: None,
2233 },
2234 },
2235 }))
2236 };
2237
2238 complete(&mut runtime, "wf-node0");
2239 let step = complete(&mut runtime, "wf-node1");
2241 assert!(step.observations.iter().any(|o| matches!(
2242 o,
2243 KernelObservation::WorkflowBatchSpawned { nodes, .. }
2244 if nodes.len() == 1 && nodes[0].agent_id == "wf-node2"
2245 )));
2246
2247 let step = complete(&mut runtime, "wf-node2");
2249 assert!(step.observations.iter().any(|o| matches!(
2250 o,
2251 KernelObservation::WorkflowCompleted { completed, .. } if completed.len() == 3
2252 )));
2253 }
2254
2255 #[test]
2256 fn load_workflow_self_bootstraps_with_no_prior_start_run() {
2257 use crate::orchestration::workflow::fanout_synthesize;
2261 use crate::types::result::{LoopResult, SubAgentResult, TerminationReason};
2262
2263 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2264 let spec =
2267 fanout_synthesize(vec![RuntimeTask::new("w0"), RuntimeTask::new("w1")], RuntimeTask::new("synth"));
2268 let step = runtime.step(KernelInput::new(KernelInputEvent::LoadWorkflow {
2269 spec,
2270 parent_session_id: "sess".to_string(),
2271 resumed_completed: Vec::new(),
2272 resumed_submissions: Vec::new(),
2273 }));
2274 let batch = step
2275 .observations
2276 .iter()
2277 .find_map(|o| match o {
2278 KernelObservation::WorkflowBatchSpawned { nodes, .. } => Some(nodes.clone()),
2279 _ => None,
2280 })
2281 .expect("workflow_batch_spawned even without a prior StartRun");
2282 assert_eq!(batch.len(), 2);
2283
2284 let complete = |runtime: &mut KernelRuntime, id: &str| {
2285 runtime.step(KernelInput::new(KernelInputEvent::SubAgentCompleted {
2286 result: SubAgentResult {
2287 agent_id: compact_str::CompactString::new(id),
2288 result: LoopResult {
2289 termination: TerminationReason::Completed,
2290 final_message: None,
2291 turns_used: 1,
2292 total_tokens_used: 1,
2293 loop_continue: None,
2294 classify_branch: None,
2295 tournament_winner: None,
2296 },
2297 },
2298 }))
2299 };
2300 complete(&mut runtime, "wf-node0");
2301 complete(&mut runtime, "wf-node1");
2302 let step = complete(&mut runtime, "wf-node2");
2303 assert!(step.observations.iter().any(|o| matches!(
2304 o,
2305 KernelObservation::WorkflowCompleted { completed, .. } if completed.len() == 3
2306 )));
2307 }
2308
2309 #[test]
2310 fn submit_workflow_nodes_input_appends_a_node_over_the_abi() {
2311 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
2314 use crate::types::agent::AgentRole;
2315 use crate::types::result::{LoopResult, SubAgentResult, TerminationReason};
2316
2317 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2318 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
2319 task: RuntimeTask::new("parent task"),
2320 run_spec: None,
2321 }));
2322 runtime.state_machine_mut().take_observations();
2323
2324 let spec = WorkflowSpec::new(vec![WorkflowNode::new(
2326 RuntimeTask::new("root"),
2327 AgentRole::Implement,
2328 )]);
2329 runtime.step(KernelInput::new(KernelInputEvent::LoadWorkflow {
2330 spec,
2331 parent_session_id: "sess".to_string(),
2332 resumed_completed: Vec::new(),
2333 resumed_submissions: Vec::new(),
2334 }));
2335 runtime.state_machine_mut().take_observations();
2336
2337 let event = KernelInputEvent::SubmitWorkflowNodes {
2339 nodes: vec![WorkflowNode::new(RuntimeTask::new("more"), AgentRole::Implement)],
2340 submitter_agent_id: None,
2341 };
2342 let json = serde_json::to_string(&event).expect("serialize");
2343 let parsed: KernelInputEvent = serde_json::from_str(&json).expect("deserialize");
2344 let step = runtime.step(KernelInput::new(parsed));
2345 assert!(step.observations.iter().any(|o| matches!(
2347 o,
2348 KernelObservation::WorkflowBatchSpawned { nodes, .. }
2349 if nodes.len() == 1 && nodes[0].agent_id == "wf-node1" && nodes[0].goal == "more"
2350 )));
2351
2352 let complete = |runtime: &mut KernelRuntime, id: &str| {
2353 runtime.step(KernelInput::new(KernelInputEvent::SubAgentCompleted {
2354 result: SubAgentResult {
2355 agent_id: compact_str::CompactString::new(id),
2356 result: LoopResult {
2357 termination: TerminationReason::Completed,
2358 final_message: None,
2359 turns_used: 1,
2360 total_tokens_used: 1,
2361 loop_continue: None,
2362 classify_branch: None,
2363 tournament_winner: None,
2364 },
2365 },
2366 }))
2367 };
2368 complete(&mut runtime, "wf-node0");
2369 let step = complete(&mut runtime, "wf-node1");
2371 assert!(step.observations.iter().any(|o| matches!(
2372 o,
2373 KernelObservation::WorkflowCompleted { completed, .. } if completed.len() == 2
2374 )));
2375 }
2376
2377 #[test]
2378 fn submit_workflow_input_bootstraps_a_dag_over_the_abi() {
2379 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
2382 use crate::types::agent::AgentRole;
2383 use crate::types::result::{LoopResult, SubAgentResult, TerminationReason};
2384
2385 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2386 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
2387 task: RuntimeTask::new("parent task"),
2388 run_spec: None,
2389 }));
2390 runtime.state_machine_mut().take_observations();
2391
2392 let spec = WorkflowSpec::new(vec![WorkflowNode::new(
2394 RuntimeTask::new("authored root"),
2395 AgentRole::Implement,
2396 )]);
2397 let event = KernelInputEvent::SubmitWorkflow {
2398 spec,
2399 parent_session_id: "sess".to_string(),
2400 submitter_agent_id: None,
2401 };
2402 let json = serde_json::to_string(&event).expect("serialize");
2403 let parsed: KernelInputEvent = serde_json::from_str(&json).expect("deserialize");
2404 let step = runtime.step(KernelInput::new(parsed));
2405 assert!(step.observations.iter().any(|o| matches!(
2407 o,
2408 KernelObservation::WorkflowBatchSpawned { nodes, .. }
2409 if nodes.len() == 1 && nodes[0].agent_id == "wf-node0" && nodes[0].goal == "authored root"
2410 )));
2411
2412 let step = runtime.step(KernelInput::new(KernelInputEvent::SubAgentCompleted {
2413 result: SubAgentResult {
2414 agent_id: compact_str::CompactString::new("wf-node0"),
2415 result: LoopResult {
2416 termination: TerminationReason::Completed,
2417 final_message: None,
2418 turns_used: 1,
2419 total_tokens_used: 1,
2420 loop_continue: None,
2421 classify_branch: None,
2422 tournament_winner: None,
2423 },
2424 },
2425 }));
2426 assert!(step.observations.iter().any(|o| matches!(
2427 o,
2428 KernelObservation::WorkflowCompleted { completed, .. } if completed.len() == 1
2429 )));
2430 }
2431
2432 #[test]
2433 fn load_workflow_resumes_from_completed_nodes() {
2434 use crate::orchestration::workflow::fanout_synthesize;
2435
2436 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2437 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
2438 task: RuntimeTask::new("parent task"),
2439 run_spec: None,
2440 }));
2441 runtime.state_machine_mut().take_observations();
2442
2443 let spec =
2445 fanout_synthesize(vec![RuntimeTask::new("w0"), RuntimeTask::new("w1")], RuntimeTask::new("synth"));
2446 let step = runtime.step(KernelInput::new(KernelInputEvent::LoadWorkflow {
2447 spec,
2448 parent_session_id: "sess".to_string(),
2449 resumed_completed: vec!["wf-node0".to_string()],
2450 resumed_submissions: Vec::new(),
2451 }));
2452
2453 let batch = step
2455 .observations
2456 .iter()
2457 .find_map(|o| match o {
2458 KernelObservation::WorkflowBatchSpawned { nodes, .. } => Some(nodes.clone()),
2459 _ => None,
2460 })
2461 .expect("workflow_batch_spawned");
2462 assert_eq!(batch.len(), 1);
2463 assert_eq!(batch[0].agent_id, "wf-node1");
2464 }
2465}