1use serde::{Deserialize, Serialize};
9
10use crate::context::pressure::PressureAction;
11use crate::context::renderer::RenderedContext;
12use crate::context::task_state::TaskUpdate;
13use crate::context::token_engine::ContextTokenEngine;
14use crate::runtime::session::RollbackReason;
15use crate::scheduler::policy::LoopPolicy;
16use crate::scheduler::state_machine::{LoopAction, LoopEvent, LoopStateMachine};
17use crate::types::agent::AgentRunSpec;
18use crate::types::capability::{CapabilityCommand, CapabilityDescriptor, CapabilityKind};
19use crate::types::message::{Message, ToolCall, ToolResult, ToolSchema};
20use crate::types::milestone::{MilestoneCheckResult, MilestoneContract};
21use crate::types::result::{LoopResult, SubAgentResult};
22use crate::types::signal::RuntimeSignal;
23use crate::types::skill::SkillMetadata;
24use crate::types::task::RuntimeTask;
25
26pub const KERNEL_ABI_VERSION: u32 = 1;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
32#[serde(rename_all = "snake_case")]
33pub enum PolicyAction {
34 Allow,
35 Deny,
36 AskUser,
37}
38
39impl From<PolicyAction> for crate::governance::permission::PermissionAction {
40 fn from(action: PolicyAction) -> Self {
41 match action {
42 PolicyAction::Allow => Self::Allow,
43 PolicyAction::Deny => Self::Deny,
44 PolicyAction::AskUser => Self::AskUser,
45 }
46 }
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct PolicyRule {
52 pub tool_pattern: String,
53 pub action: PolicyAction,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct RateLimitSpec {
60 pub tool: String,
61 pub max_calls: u32,
62 pub window_ms: u64,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
69#[serde(tag = "kind", rename_all = "snake_case")]
70pub enum ConstraintSpec {
71 Required { tool: String, path: String },
73 Enum {
75 tool: String,
76 path: String,
77 values: Vec<String>,
78 },
79 Range {
81 tool: String,
82 path: String,
83 #[serde(default, skip_serializing_if = "Option::is_none")]
84 min: Option<f64>,
85 #[serde(default, skip_serializing_if = "Option::is_none")]
86 max: Option<f64>,
87 },
88}
89
90fn default_signal_queue_size() -> u32 {
91 64
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct KernelInput {
96 pub version: u32,
97 pub event: KernelInputEvent,
98}
99
100impl KernelInput {
101 pub fn new(event: KernelInputEvent) -> Self {
102 Self {
103 version: KERNEL_ABI_VERSION,
104 event,
105 }
106 }
107}
108
109#[derive(Debug, Clone, Default, Serialize, Deserialize)]
112pub struct GovernanceConfig {
113 #[serde(default, skip_serializing_if = "Option::is_none")]
114 pub default_action: Option<PolicyAction>,
115 #[serde(default, skip_serializing_if = "Vec::is_empty")]
116 pub rules: Vec<PolicyRule>,
117 #[serde(default, skip_serializing_if = "Vec::is_empty")]
118 pub vetoed_tools: Vec<String>,
119 #[serde(default, skip_serializing_if = "Vec::is_empty")]
120 pub rate_limits: Vec<RateLimitSpec>,
121 #[serde(default, skip_serializing_if = "Vec::is_empty")]
122 pub constraints: Vec<ConstraintSpec>,
123}
124
125#[derive(Debug, Clone, Default, Serialize, Deserialize)]
129pub struct RunConfig {
130 #[serde(default, skip_serializing_if = "Option::is_none")]
131 pub tools: Option<Vec<ToolSchema>>,
132 #[serde(default, skip_serializing_if = "Option::is_none")]
133 pub available_skills: Option<Vec<SkillMetadata>>,
134 #[serde(default, skip_serializing_if = "Option::is_none")]
135 pub stable_core_tools: Option<Vec<String>>,
136 #[serde(default, skip_serializing_if = "Option::is_none")]
137 pub memory_enabled: Option<bool>,
138 #[serde(default, skip_serializing_if = "Option::is_none")]
139 pub knowledge_enabled: Option<bool>,
140 #[serde(default, skip_serializing_if = "Option::is_none")]
141 pub plan_tool_enabled: Option<bool>,
142 #[serde(default, skip_serializing_if = "Option::is_none")]
144 pub tokenizer: Option<String>,
145 #[serde(default, skip_serializing_if = "Option::is_none")]
146 pub governance: Option<GovernanceConfig>,
147 #[serde(default, skip_serializing_if = "Option::is_none")]
148 pub attention_max_queue_size: Option<u32>,
149 #[serde(default, skip_serializing_if = "Option::is_none")]
150 pub scheduler_max_wall_ms: Option<u64>,
151 #[serde(default, skip_serializing_if = "Option::is_none")]
152 pub resource_quota: Option<crate::governance::quota::ResourceQuota>,
153}
154
155pub(crate) fn build_governance_pipeline(
159 default_action: Option<PolicyAction>,
160 rules: Vec<PolicyRule>,
161 vetoed_tools: Vec<String>,
162 rate_limits: Vec<RateLimitSpec>,
163 constraints: Vec<ConstraintSpec>,
164) -> crate::governance::pipeline::GovernancePipeline {
165 use crate::governance::constraint::{ConstraintRule, ParamConstraint};
166 use crate::governance::permission::PermissionRule;
167 use crate::governance::rate_limit::RateLimit;
168 let default = default_action.unwrap_or(PolicyAction::Allow).into();
169 let mut pipeline = crate::governance::pipeline::GovernancePipeline::new(default);
170 for rule in rules {
171 pipeline.permission.add_rule(PermissionRule {
172 tool_pattern: rule.tool_pattern.into(),
173 action: rule.action.into(),
174 });
175 }
176 for tool in vetoed_tools {
177 pipeline.veto.block_tool(tool);
178 }
179 for rl in rate_limits {
180 pipeline.rate_limiter.set_limit(
181 rl.tool,
182 RateLimit {
183 max_calls: rl.max_calls,
184 window_ms: rl.window_ms,
185 },
186 );
187 }
188 for c in constraints {
189 let (tool_name, param_path, rule) = match c {
190 ConstraintSpec::Required { tool, path } => (tool, path, ConstraintRule::Required),
191 ConstraintSpec::Enum { tool, path, values } => (tool, path, ConstraintRule::Enum(values)),
192 ConstraintSpec::Range { tool, path, min, max } => {
193 (tool, path, ConstraintRule::Range { min, max })
194 }
195 };
196 pipeline.constraints.add(ParamConstraint {
197 tool_name,
198 param_path,
199 rule,
200 });
201 }
202 pipeline
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
206#[serde(tag = "kind", rename_all = "snake_case")]
207pub enum KernelInputEvent {
208 SetTools {
209 tools: Vec<ToolSchema>,
210 },
211 SetAvailableSkills {
212 skills: Vec<SkillMetadata>,
213 },
214 SkillActivated {
218 name: String,
219 },
220 SetStableCoreTools {
223 tool_ids: Vec<String>,
224 },
225 SetMemoryEnabled {
226 enabled: bool,
227 },
228 SetKnowledgeEnabled {
229 enabled: bool,
230 },
231 SetPlanToolEnabled {
232 enabled: bool,
233 },
234 SetTokenizer {
235 name: String,
236 },
237 AddSystemMessage {
238 content: String,
239 tokens: u32,
240 },
241 AddKnowledgeMessage {
242 content: String,
243 tokens: u32,
244 },
245 AddHistoryMessage {
246 message: Message,
247 tokens: Option<u32>,
248 },
249 PreloadHistory {
250 messages: Vec<Message>,
251 },
252 MountCapability {
253 capability: CapabilityDescriptor,
254 },
255 UnmountCapability {
256 capability_kind: CapabilityKind,
257 id: String,
258 },
259 LoadMilestoneContract {
260 contract: MilestoneContract,
261 },
262 LoadGovernancePolicy {
266 #[serde(default)]
267 default_action: Option<PolicyAction>,
268 #[serde(default, skip_serializing_if = "Vec::is_empty")]
269 rules: Vec<PolicyRule>,
270 #[serde(default, skip_serializing_if = "Vec::is_empty")]
271 vetoed_tools: Vec<String>,
272 #[serde(default, skip_serializing_if = "Vec::is_empty")]
275 rate_limits: Vec<RateLimitSpec>,
276 #[serde(default, skip_serializing_if = "Vec::is_empty")]
277 constraints: Vec<ConstraintSpec>,
278 },
279 SetAttentionPolicy {
282 #[serde(default = "default_signal_queue_size")]
283 max_queue_size: u32,
284 },
285 ForceCompact,
286 UpdateTask {
287 update: TaskUpdate,
288 },
289 StartRun {
290 task: RuntimeTask,
291 #[serde(default, skip_serializing_if = "Option::is_none")]
292 run_spec: Option<AgentRunSpec>,
293 },
294 ConfigureRun {
299 config: RunConfig,
300 },
301 CapabilityCommand {
302 command: CapabilityCommand,
303 },
304 Resume {
305 #[serde(default, skip_serializing_if = "Vec::is_empty")]
309 approved_calls: Vec<String>,
310 #[serde(default, skip_serializing_if = "Vec::is_empty")]
311 denied_calls: Vec<String>,
312 },
313 SetSchedulerBudget {
317 #[serde(default, skip_serializing_if = "Option::is_none")]
318 max_wall_ms: Option<u64>,
319 },
320 SetResourceQuota {
326 quota: crate::governance::quota::ResourceQuota,
327 },
328 ProviderResult {
329 message: Message,
330 #[serde(default, skip_serializing_if = "Option::is_none")]
331 observed_input_tokens: Option<u32>,
332 #[serde(default, skip_serializing_if = "Option::is_none")]
333 observed_output_tokens: Option<u32>,
334 #[serde(default, skip_serializing_if = "Option::is_none")]
338 now_ms: Option<u64>,
339 },
340 ToolResults {
341 results: Vec<ToolResult>,
342 },
343 Signal {
344 signal: RuntimeSignal,
345 },
346 MilestoneResult {
347 result: MilestoneCheckResult,
348 },
349 SpawnSubAgent {
351 spec: AgentRunSpec,
352 parent_session_id: String,
353 },
354 LoadWorkflow {
359 spec: crate::orchestration::workflow::WorkflowSpec,
360 parent_session_id: String,
361 #[serde(default, skip_serializing_if = "Vec::is_empty")]
363 resumed_completed: Vec<String>,
364 #[serde(default, skip_serializing_if = "Vec::is_empty")]
368 resumed_submissions: Vec<Vec<crate::orchestration::workflow::WorkflowNode>>,
369 },
370 SubAgentCompleted {
372 result: SubAgentResult,
373 },
374 SubmitWorkflowNodes {
379 #[serde(default, skip_serializing_if = "Vec::is_empty")]
380 nodes: Vec<crate::orchestration::workflow::WorkflowNode>,
381 #[serde(default, skip_serializing_if = "Option::is_none")]
385 submitter_agent_id: Option<String>,
386 },
387 SubmitWorkflow {
393 spec: crate::orchestration::workflow::WorkflowSpec,
394 #[serde(default)]
396 parent_session_id: String,
397 #[serde(default, skip_serializing_if = "Option::is_none")]
400 submitter_agent_id: Option<String>,
401 },
402 PageIn {
405 #[serde(default, skip_serializing_if = "Vec::is_empty")]
406 entries: Vec<crate::mm::PageInEntry>,
407 },
408 SetMemoryPolicy {
411 #[serde(default)]
412 memory_path: String,
413 #[serde(default = "default_stale_days")]
414 stale_warning_days: u32,
415 #[serde(default = "default_top_k")]
416 retrieval_top_k: usize,
417 #[serde(default = "default_validation_enabled")]
418 validation_enabled: bool,
419 #[serde(default, skip_serializing_if = "Option::is_none")]
421 max_content_bytes: Option<u32>,
422 #[serde(default, skip_serializing_if = "Option::is_none")]
424 max_name_length: Option<usize>,
425 },
426 WriteMemory {
428 memory: crate::mm::memory::MemoryWriteRequest,
429 },
430 QueryMemory {
432 query: crate::mm::memory::MemoryQuery,
433 },
434 MemoryRetrievalResult {
436 retrieval: crate::mm::memory::MemoryRetrieval,
437 },
438 Timeout,
439}
440
441fn default_stale_days() -> u32 { 2 }
442fn default_top_k() -> usize { 5 }
443fn default_validation_enabled() -> bool { true }
444
445#[derive(Debug, Clone, Serialize, Deserialize)]
446pub struct KernelStep {
447 pub version: u32,
448 pub actions: Vec<KernelAction>,
449 pub observations: Vec<KernelObservation>,
450}
451
452impl KernelStep {
453 fn empty(observations: Vec<KernelObservation>) -> Self {
454 Self {
455 version: KERNEL_ABI_VERSION,
456 actions: Vec::new(),
457 observations,
458 }
459 }
460
461 fn single(action: LoopAction, observations: Vec<KernelObservation>) -> Self {
462 Self {
463 version: KERNEL_ABI_VERSION,
464 actions: vec![action.into()],
465 observations,
466 }
467 }
468}
469
470#[derive(Debug, Clone, Serialize, Deserialize)]
471#[serde(tag = "kind", rename_all = "snake_case")]
472pub enum KernelAction {
473 CallProvider {
474 context: RenderedContext,
475 tools: Vec<ToolSchema>,
476 },
477 ExecuteTool {
478 calls: Vec<ToolCall>,
479 },
480 EvaluateMilestone {
481 phase_id: String,
482 criteria: Vec<String>,
483 #[serde(default, skip_serializing_if = "Option::is_none")]
484 verifier: Option<crate::types::milestone::MilestoneVerifier>,
485 #[serde(default, skip_serializing_if = "Vec::is_empty")]
486 required_evidence: Vec<String>,
487 },
488 Done {
489 result: LoopResult,
490 },
491}
492
493impl From<LoopAction> for KernelAction {
494 fn from(action: LoopAction) -> Self {
495 match action {
496 LoopAction::AwaitingResume => {
497 panic!("AwaitingResume must not be converted to KernelAction")
498 }
499 LoopAction::CallLLM { context, tools } => Self::CallProvider { context, tools },
500 LoopAction::ExecuteTools { calls } => Self::ExecuteTool { calls },
501 LoopAction::EvaluateMilestone {
502 phase_id,
503 criteria,
504 verifier,
505 required_evidence,
506 } => Self::EvaluateMilestone {
507 phase_id,
508 criteria,
509 verifier,
510 required_evidence,
511 },
512 LoopAction::Done { result } => Self::Done { result },
513 }
514 }
515}
516
517#[derive(Debug, Clone, Serialize, Deserialize)]
518#[serde(tag = "kind", rename_all = "snake_case")]
519pub enum KernelObservation {
520 Compressed {
521 action: KernelPressureAction,
522 rho_after: f64,
523 summary: Option<String>,
524 archived: Vec<Message>,
525 #[serde(default, skip_serializing_if = "Option::is_none")]
529 invalidates_prefix_at: Option<usize>,
530 },
531 Renewed {
532 sprint: u32,
533 },
534 Rollbacked {
535 turn: u32,
536 checkpoint_history_len: u32,
537 #[serde(default, skip_serializing_if = "Option::is_none")]
538 reason: Option<RollbackReason>,
539 },
540 CapabilityChanged {
541 turn: u32,
542 #[serde(default, skip_serializing_if = "Vec::is_empty")]
543 added: Vec<String>,
544 #[serde(default, skip_serializing_if = "Vec::is_empty")]
545 removed: Vec<String>,
546 #[serde(default, skip_serializing_if = "Option::is_none")]
547 change_kind: Option<String>,
548 #[serde(default, skip_serializing_if = "Option::is_none")]
549 capability_id: Option<String>,
550 #[serde(default, skip_serializing_if = "Option::is_none")]
551 version: Option<String>,
552 #[serde(default, skip_serializing_if = "Option::is_none")]
553 mounted_by: Option<String>,
554 #[serde(default, skip_serializing_if = "Option::is_none")]
555 mount_reason: Option<String>,
556 },
557 MilestoneAdvanced {
558 turn: u32,
559 phase_id: String,
560 capabilities_unlocked: Vec<String>,
561 },
562 MilestoneBlocked {
563 turn: u32,
564 phase_id: String,
565 reason: String,
566 },
567 MilestoneEvidence {
569 turn: u32,
570 phase_id: String,
571 #[serde(default, skip_serializing_if = "Vec::is_empty")]
572 evidence: Vec<String>,
573 },
574 CheckpointTaken {
576 turn: u32,
577 history_len: u32,
578 },
579 AgentProcessChanged {
581 turn: u32,
582 agent_id: String,
583 parent_session_id: String,
584 role: String,
585 isolation: String,
586 context_inheritance: String,
587 state: String,
588 #[serde(default, skip_serializing_if = "Vec::is_empty")]
589 permitted_capability_ids: Vec<String>,
590 #[serde(default, skip_serializing_if = "Option::is_none")]
591 result_termination: Option<String>,
592 },
593 WorkflowBatchSpawned {
596 turn: u32,
597 nodes: Vec<crate::orchestration::workflow::WorkflowSpawnInfo>,
598 #[serde(default, skip_serializing_if = "Option::is_none")]
602 budget: Option<crate::orchestration::workflow::WorkflowBudget>,
603 },
604 WorkflowCompleted {
606 turn: u32,
607 #[serde(default, skip_serializing_if = "Vec::is_empty")]
608 completed: Vec<String>,
609 #[serde(default, skip_serializing_if = "Vec::is_empty")]
610 failed: Vec<String>,
611 },
612 AgentPreempted {
618 turn: u32,
619 #[serde(default, skip_serializing_if = "Vec::is_empty")]
620 agent_ids: Vec<String>,
621 reason: String,
622 },
623 ToolGated {
626 turn: u32,
627 call_id: String,
628 tool: String,
629 reason: String,
630 },
631 SignalDisposed {
633 turn: u32,
634 signal_id: String,
635 disposition: String,
636 queue_depth: u32,
637 },
638 BudgetExceeded { turn: u32, budget: String },
640 Suspended {
642 turn: u32,
643 reason: String,
644 #[serde(default, skip_serializing_if = "Vec::is_empty")]
645 pending_calls: Vec<String>,
646 },
647 Resumed {
649 turn: u32,
650 #[serde(default, skip_serializing_if = "Vec::is_empty")]
651 approved: Vec<String>,
652 #[serde(default, skip_serializing_if = "Vec::is_empty")]
653 denied: Vec<String>,
654 },
655 PageOut {
657 turn: u32,
658 action: KernelPressureAction,
659 rho_after: f64,
660 #[serde(default, skip_serializing_if = "Option::is_none")]
661 summary: Option<String>,
662 #[serde(default, skip_serializing_if = "Vec::is_empty")]
663 archived: Vec<Message>,
664 tier_hint: String,
665 },
666 PageInRequested {
668 turn: u32,
669 call_id: String,
670 tool: String,
671 query: String,
672 top_k: u32,
673 },
674 MemoryWritten {
676 turn: u32,
677 memory_id: String,
678 memory_kind: String,
679 size_bytes: u32,
680 },
681 MemoryValidationFailed {
683 turn: u32,
684 memory_id: String,
685 error: String,
686 },
687 MemoryQueried {
689 turn: u32,
690 query_context: String,
691 requested_k: usize,
692 requires_async_response: bool,
693 },
694 LargeResultSpooled {
696 turn: u32,
697 call_id: String,
698 tool: String,
699 original_size: u32,
700 preview_size: u32,
701 spool_ref: Option<String>,
702 },
703}
704
705#[derive(Debug, Clone, Serialize, Deserialize)]
711#[serde(tag = "kind", rename_all = "snake_case")]
712pub enum TransactionObservation {
713 CheckpointTaken { turn: u32, history_len: u32 },
714 Rollbacked {
715 turn: u32,
716 checkpoint_history_len: u32,
717 #[serde(default, skip_serializing_if = "Option::is_none")]
718 reason: Option<crate::runtime::session::RollbackReason>,
719 },
720}
721
722#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
723#[serde(rename_all = "snake_case")]
724pub enum KernelPressureAction {
725 None,
726 SnipCompact,
727 MicroCompact,
728 ContextCollapse,
729 AutoCompact,
730}
731
732impl From<PressureAction> for KernelPressureAction {
733 fn from(action: PressureAction) -> Self {
734 match action {
735 PressureAction::None => Self::None,
736 PressureAction::SnipCompact => Self::SnipCompact,
737 PressureAction::MicroCompact => Self::MicroCompact,
738 PressureAction::ContextCollapse => Self::ContextCollapse,
739 PressureAction::AutoCompact => Self::AutoCompact,
740 }
741 }
742}
743
744pub struct KernelRuntime {
747 sm: LoopStateMachine,
748}
749
750impl KernelRuntime {
751 pub fn new(policy: LoopPolicy) -> Self {
752 Self {
753 sm: LoopStateMachine::new(policy),
754 }
755 }
756
757 pub fn state_machine(&self) -> &LoopStateMachine {
758 &self.sm
759 }
760
761 pub fn state_machine_mut(&mut self) -> &mut LoopStateMachine {
762 &mut self.sm
763 }
764
765 pub fn is_terminal(&self) -> bool {
766 self.sm.is_terminal()
767 }
768
769 pub fn step(&mut self, input: KernelInput) -> KernelStep {
770 let action = match input.event {
771 KernelInputEvent::SetTools { tools } => {
772 self.sm.tools = tools;
773 return KernelStep::empty(self.sm.take_observations());
774 }
775 KernelInputEvent::SetAvailableSkills { skills } => {
776 self.sm.ctx.set_available_skills(skills);
777 return KernelStep::empty(self.sm.take_observations());
778 }
779 KernelInputEvent::SkillActivated { name } => {
780 self.sm.ctx.activate_skill(name);
783 return KernelStep::empty(self.sm.take_observations());
784 }
785 KernelInputEvent::SetStableCoreTools { tool_ids } => {
786 self.sm.ctx.set_stable_core_tools(tool_ids.into_iter().map(Into::into));
787 return KernelStep::empty(self.sm.take_observations());
788 }
789 KernelInputEvent::SetMemoryEnabled { enabled } => {
790 self.sm.ctx.set_memory_enabled(enabled);
791 return KernelStep::empty(self.sm.take_observations());
792 }
793 KernelInputEvent::SetKnowledgeEnabled { enabled } => {
794 self.sm.ctx.set_knowledge_enabled(enabled);
795 return KernelStep::empty(self.sm.take_observations());
796 }
797 KernelInputEvent::SetPlanToolEnabled { enabled } => {
798 self.sm.ctx.set_plan_tool_enabled(enabled);
799 return KernelStep::empty(self.sm.take_observations());
800 }
801 KernelInputEvent::SetTokenizer { .. } => {
802 self.sm.ctx.engine = ContextTokenEngine::char_approx();
806 return KernelStep::empty(self.sm.take_observations());
807 }
808 KernelInputEvent::AddSystemMessage { content, tokens } => {
809 self.sm
810 .ctx
811 .partitions
812 .system
813 .push(Message::system(content), tokens.max(1));
814 return KernelStep::empty(self.sm.take_observations());
815 }
816 KernelInputEvent::AddKnowledgeMessage { content, tokens } => {
817 self.sm.ctx.partitions.knowledge.push(Message::system(content), tokens.max(1));
826 return KernelStep::empty(self.sm.take_observations());
827 }
828 KernelInputEvent::AddHistoryMessage { message, tokens } => {
829 let tokens = tokens.unwrap_or_else(|| self.sm.ctx.engine.count_message(&message));
830 self.sm.ctx.push_history(message, tokens.max(1));
831 return KernelStep::empty(self.sm.take_observations());
832 }
833 KernelInputEvent::PreloadHistory { messages } => {
834 self.sm.preload_history(messages);
835 return KernelStep::empty(self.sm.take_observations());
836 }
837 KernelInputEvent::MountCapability { capability } => {
838 self.sm.mount_capability(capability, None, None);
839 return KernelStep::empty(self.sm.take_observations());
840 }
841 KernelInputEvent::UnmountCapability {
842 capability_kind,
843 id,
844 } => {
845 self.sm.unmount_capability(capability_kind, &id);
846 return KernelStep::empty(self.sm.take_observations());
847 }
848 KernelInputEvent::LoadMilestoneContract { contract } => {
849 self.sm.load_milestone_contract(contract);
850 return KernelStep::empty(self.sm.take_observations());
851 }
852 KernelInputEvent::LoadGovernancePolicy {
853 default_action,
854 rules,
855 vetoed_tools,
856 rate_limits,
857 constraints,
858 } => {
859 self.sm.set_governance(build_governance_pipeline(
860 default_action,
861 rules,
862 vetoed_tools,
863 rate_limits,
864 constraints,
865 ));
866 return KernelStep::empty(self.sm.take_observations());
867 }
868 KernelInputEvent::ConfigureRun { config } => {
869 let RunConfig {
876 tools,
877 available_skills,
878 stable_core_tools,
879 memory_enabled,
880 knowledge_enabled,
881 plan_tool_enabled,
882 tokenizer,
883 governance,
884 attention_max_queue_size,
885 scheduler_max_wall_ms,
886 resource_quota,
887 } = config;
888 if let Some(tools) = tools {
889 self.sm.tools = tools;
890 }
891 if let Some(skills) = available_skills {
892 self.sm.ctx.set_available_skills(skills);
893 }
894 if let Some(ids) = stable_core_tools {
895 self.sm.ctx.set_stable_core_tools(ids.into_iter().map(Into::into));
896 }
897 if let Some(enabled) = memory_enabled {
898 self.sm.ctx.set_memory_enabled(enabled);
899 }
900 if let Some(enabled) = knowledge_enabled {
901 self.sm.ctx.set_knowledge_enabled(enabled);
902 }
903 if let Some(enabled) = plan_tool_enabled {
904 self.sm.ctx.set_plan_tool_enabled(enabled);
905 }
906 if tokenizer.is_some() {
907 self.sm.ctx.engine = ContextTokenEngine::char_approx();
908 }
909 if let Some(g) = governance {
910 self.sm.set_governance(build_governance_pipeline(
911 g.default_action,
912 g.rules,
913 g.vetoed_tools,
914 g.rate_limits,
915 g.constraints,
916 ));
917 }
918 if let Some(max_queue) = attention_max_queue_size {
919 self.sm.set_attention(max_queue as usize);
920 }
921 if let Some(ms) = scheduler_max_wall_ms {
922 self.sm.set_wall_budget(Some(ms));
923 }
924 if let Some(quota) = resource_quota {
925 self.sm.set_resource_quota(quota);
926 }
927 return KernelStep::empty(self.sm.take_observations());
928 }
929 KernelInputEvent::SetAttentionPolicy { max_queue_size } => {
930 self.sm.set_attention(max_queue_size as usize);
931 return KernelStep::empty(self.sm.take_observations());
932 }
933 KernelInputEvent::PageIn { entries } => {
934 self.sm.apply_page_in(&entries);
935 return KernelStep::empty(self.sm.take_observations());
936 }
937 KernelInputEvent::ForceCompact => {
938 self.sm.force_compact();
939 return KernelStep::empty(self.sm.take_observations());
940 }
941 KernelInputEvent::UpdateTask { update } => {
942 self.sm.ctx.update_task(update);
943 return KernelStep::empty(self.sm.take_observations());
944 }
945 KernelInputEvent::StartRun { task, run_spec } => {
946 self.sm.run_spec = run_spec;
947 self.sm.start(task)
948 }
949 KernelInputEvent::CapabilityCommand { command } => {
950 self.sm.execute_capability_command(command);
951 return KernelStep::empty(self.sm.take_observations());
952 }
953 KernelInputEvent::Resume { approved_calls, denied_calls } => {
954 let action = self.sm.resume_from_suspend(approved_calls, denied_calls);
955 if matches!(action, LoopAction::AwaitingResume) {
956 return KernelStep::empty(self.sm.take_observations());
957 }
958 return KernelStep::single(action, self.sm.take_observations());
959 }
960 KernelInputEvent::SetSchedulerBudget { max_wall_ms } => {
961 self.sm.set_wall_budget(max_wall_ms);
962 return KernelStep::empty(self.sm.take_observations());
963 }
964 KernelInputEvent::SetResourceQuota { quota } => {
965 self.sm.set_resource_quota(quota);
966 return KernelStep::empty(self.sm.take_observations());
967 }
968 KernelInputEvent::ProviderResult {
969 message,
970 observed_input_tokens,
971 observed_output_tokens: _,
972 now_ms,
973 } => {
974 if let Some(tokens) = observed_input_tokens {
975 self.sm.ctx.set_observed_prompt_tokens(tokens);
976 }
977 if let Some(ms) = now_ms {
980 self.sm.set_observed_time(ms);
981 }
982 self.sm.feed(LoopEvent::LLMResponse { message })
983 }
984 KernelInputEvent::ToolResults { results } => {
985 self.sm.feed(LoopEvent::ToolResults { results })
986 }
987 KernelInputEvent::Signal { signal } => match self.sm.signal_event(signal) {
988 Some(action) => action,
989 None => return KernelStep::empty(self.sm.take_observations()),
992 },
993 KernelInputEvent::MilestoneResult { result } => {
994 self.sm.feed(LoopEvent::MilestoneResult { result })
995 }
996 KernelInputEvent::SpawnSubAgent {
997 spec,
998 parent_session_id,
999 } => {
1000 let action = self.sm.spawn_sub_agent(spec, &parent_session_id);
1001 if matches!(action, LoopAction::AwaitingResume) {
1002 return KernelStep::empty(self.sm.take_observations());
1003 }
1004 return KernelStep::single(action, self.sm.take_observations());
1005 }
1006 KernelInputEvent::LoadWorkflow {
1007 spec,
1008 parent_session_id,
1009 resumed_completed,
1010 resumed_submissions,
1011 } => {
1012 self.sm.ensure_started_for_workflow(&spec);
1016 let action = if resumed_completed.is_empty() && resumed_submissions.is_empty() {
1017 self.sm.load_workflow(spec, &parent_session_id)
1018 } else {
1019 self.sm.load_workflow_resumed(
1020 spec,
1021 &parent_session_id,
1022 &resumed_submissions,
1023 &resumed_completed,
1024 )
1025 };
1026 if matches!(action, LoopAction::AwaitingResume) {
1027 return KernelStep::empty(self.sm.take_observations());
1028 }
1029 return KernelStep::single(action, self.sm.take_observations());
1030 }
1031 KernelInputEvent::SubAgentCompleted { result } => {
1032 self.sm.feed(LoopEvent::SubAgentCompleted { result })
1033 }
1034 KernelInputEvent::SubmitWorkflowNodes {
1035 nodes,
1036 submitter_agent_id,
1037 } => {
1038 let action = self
1039 .sm
1040 .submit_workflow_nodes(nodes, submitter_agent_id.as_deref());
1041 if matches!(action, LoopAction::AwaitingResume) {
1042 return KernelStep::empty(self.sm.take_observations());
1043 }
1044 return KernelStep::single(action, self.sm.take_observations());
1045 }
1046 KernelInputEvent::SubmitWorkflow {
1047 spec,
1048 parent_session_id,
1049 submitter_agent_id,
1050 } => {
1051 let action = self.sm.submit_workflow(
1052 spec,
1053 &parent_session_id,
1054 submitter_agent_id.as_deref(),
1055 );
1056 if matches!(action, LoopAction::AwaitingResume) {
1057 return KernelStep::empty(self.sm.take_observations());
1058 }
1059 return KernelStep::single(action, self.sm.take_observations());
1060 }
1061 KernelInputEvent::SetMemoryPolicy {
1062 memory_path,
1063 stale_warning_days,
1064 retrieval_top_k,
1065 validation_enabled,
1066 max_content_bytes,
1067 max_name_length,
1068 } => {
1069 self.sm.set_memory_policy(crate::mm::memory::MemoryPolicy {
1073 memory_path,
1074 stale_warning_days,
1075 retrieval_top_k,
1076 validation_enabled,
1077 max_content_bytes,
1078 max_name_length,
1079 });
1080 return KernelStep::empty(self.sm.take_observations());
1081 }
1082 KernelInputEvent::WriteMemory { memory } => {
1083 use crate::mm::memory::validate_memory_write;
1086 let turn = self.sm.turn;
1087 let disposition = self
1091 .sm
1092 .gate_syscall(&crate::syscall::Syscall::WriteMemory(memory.clone()));
1093 if !disposition.is_allowed() {
1094 let error = match disposition {
1095 crate::syscall::Disposition::RateLimited { retry_after_ms } => {
1096 format!("memory write rate limited; retry after {retry_after_ms}ms")
1097 }
1098 crate::syscall::Disposition::Deny { reason, .. } => {
1099 format!("memory write denied: {reason}")
1100 }
1101 _ => "memory write not permitted".to_string(),
1102 };
1103 self.sm.observations.push(
1104 KernelObservation::MemoryValidationFailed {
1105 turn,
1106 memory_id: memory.metadata.name.clone(),
1107 error,
1108 },
1109 );
1110 return KernelStep::empty(self.sm.take_observations());
1111 }
1112 let validation_result = match self.sm.memory_policy() {
1116 Some(p) if !p.validation_enabled => Ok(()),
1117 Some(p) => p.validation().validate(&memory),
1118 None => validate_memory_write(&memory),
1119 };
1120 match validation_result {
1121 Ok(()) => {
1122 self.sm.observations.push(KernelObservation::MemoryWritten {
1124 turn,
1125 memory_id: memory.metadata.name.clone(),
1126 memory_kind: memory.metadata.kind.map(|k| k.label()).unwrap_or_else(|| {
1127 crate::mm::memory::MemoryKind::infer_from_metadata(&memory.metadata).label()
1128 }).to_string(),
1129 size_bytes: memory.content.len() as u32,
1130 });
1131 }
1132 Err(err) => {
1133 use crate::mm::memory::MemoryValidationError;
1135 let error_msg = match err {
1136 MemoryValidationError::MissingRequiredField { field } => format!("Missing required field: {}", field),
1137 MemoryValidationError::ContentTooLarge { size, limit } => format!("Content too large: {} bytes (limit: {})", size, limit),
1138 MemoryValidationError::ForbiddenPattern { pattern, reason } => format!("Forbidden pattern '{}': {}", pattern, reason),
1139 MemoryValidationError::InvalidKind { kind } => format!("Invalid kind: {}", kind),
1140 MemoryValidationError::NameTooLong { length, limit } => format!("Name too long: {} chars (limit: {})", length, limit),
1141 };
1142 self.sm.observations.push(KernelObservation::MemoryValidationFailed {
1143 turn,
1144 memory_id: memory.metadata.name.clone(),
1145 error: error_msg,
1146 });
1147 }
1148 }
1149 return KernelStep::empty(self.sm.take_observations());
1150 }
1151 KernelInputEvent::QueryMemory { query } => {
1152 let turn = self.sm.turn;
1155 let requested_k = match self.sm.memory_policy() {
1157 Some(p) => p.clamp_top_k(query.top_k),
1158 None => query.top_k,
1159 };
1160 self.sm.observations.push(KernelObservation::MemoryQueried {
1161 turn,
1162 query_context: query.current_context.clone(),
1163 requested_k,
1164 requires_async_response: true,
1165 });
1166 return KernelStep::empty(self.sm.take_observations());
1167 }
1168 KernelInputEvent::MemoryRetrievalResult { .. } => {
1169 return KernelStep::empty(self.sm.take_observations());
1171 }
1172 KernelInputEvent::Timeout => self.sm.feed(LoopEvent::Timeout),
1173 };
1174 if matches!(action, LoopAction::AwaitingResume) {
1175 return KernelStep::empty(self.sm.take_observations());
1176 }
1177 KernelStep::single(action, self.sm.take_observations())
1178 }
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183 use super::*;
1184
1185 #[test]
1186 fn start_run_returns_versioned_provider_action() {
1187 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1188 let step = runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1189 task: RuntimeTask::new("ship it"),
1190 run_spec: None,
1191 }));
1192
1193 assert_eq!(step.version, KERNEL_ABI_VERSION);
1194 assert!(matches!(
1195 step.actions.as_slice(),
1196 [KernelAction::CallProvider { .. }]
1197 ));
1198 }
1199
1200 #[test]
1201 fn provider_text_response_returns_done() {
1202 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1203 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1204 task: RuntimeTask::new("ship it"),
1205 run_spec: None,
1206 }));
1207 let step = runtime.step(KernelInput::new(KernelInputEvent::ProviderResult {
1208 message: Message::assistant("done"),
1209 observed_input_tokens: None,
1210 observed_output_tokens: None,
1211 now_ms: None,
1212 }));
1213
1214 assert!(matches!(
1215 step.actions.as_slice(),
1216 [KernelAction::Done { .. }]
1217 ));
1218 }
1219
1220 #[test]
1221 fn config_inputs_mutate_runtime_without_actions() {
1222 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1223 let step = runtime.step(KernelInput::new(KernelInputEvent::SetTools {
1224 tools: vec![ToolSchema {
1225 name: "echo".into(),
1226 description: "Echo input".to_string(),
1227 parameters: serde_json::json!({"type": "object"}),
1228 }],
1229 }));
1230
1231 assert!(step.actions.is_empty());
1232 assert_eq!(runtime.state_machine().tools.len(), 1);
1233 }
1234
1235 #[test]
1236 fn skill_activated_input_records_active_skill() {
1237 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1240 let mut debug = SkillMetadata::new("debug", "Debug helper");
1241 debug.allowed_tools = vec!["read".into(), "grep".into()];
1242 runtime.step(KernelInput::new(KernelInputEvent::SetAvailableSkills {
1243 skills: vec![debug],
1244 }));
1245
1246 let step = runtime.step(KernelInput::new(KernelInputEvent::SkillActivated {
1247 name: "debug".to_string(),
1248 }));
1249
1250 assert!(step.actions.is_empty(), "activation is config, not an action");
1251 assert!(runtime.state_machine().ctx.active_skills.contains("debug"));
1252 let filter = runtime.state_machine().ctx.active_skill_tool_filter().unwrap();
1253 assert_eq!(filter.len(), 2);
1254 }
1255
1256 #[test]
1257 fn update_task_input_mutates_task_state() {
1258 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1259 let step = runtime.step(KernelInput::new(KernelInputEvent::UpdateTask {
1260 update: TaskUpdate {
1261 progress: Some("tools executed".to_string()),
1262 ..Default::default()
1263 },
1264 }));
1265
1266 assert!(step.actions.is_empty());
1267 assert_eq!(
1268 runtime.state_machine().ctx.partitions.task_state.progress,
1269 "tools executed"
1270 );
1271 }
1272
1273 #[test]
1274 fn add_knowledge_message_enters_knowledge_partition() {
1275 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1276 let step = runtime.step(KernelInput::new(KernelInputEvent::AddKnowledgeMessage {
1277 content: "skill: debug".to_string(),
1278 tokens: 10,
1279 }));
1280
1281 assert!(step.actions.is_empty());
1282 assert_eq!(
1283 runtime.state_machine().ctx.partitions.knowledge.messages.len(),
1284 1
1285 );
1286 }
1287
1288 #[test]
1289 fn capability_mount_emits_observation() {
1290 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1291 let step = runtime.step(KernelInput::new(KernelInputEvent::MountCapability {
1292 capability: CapabilityDescriptor::marker(
1293 CapabilityKind::McpServer,
1294 "docs",
1295 "Documentation server",
1296 ),
1297 }));
1298
1299 assert!(step.actions.is_empty());
1300 assert!(matches!(
1301 step.observations.as_slice(),
1302 [KernelObservation::CapabilityChanged { .. }]
1303 ));
1304 }
1305
1306 #[test]
1307 fn spawn_sub_agent_input_registers_process() {
1308 use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
1309
1310 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1311 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1312 task: RuntimeTask::new("parent task"),
1313 run_spec: None,
1314 }));
1315 runtime.state_machine_mut().take_observations();
1316
1317 let spec = AgentRunSpec::new(
1318 AgentIdentity::sub_agent("worker", "worker-session"),
1319 AgentRole::Implement,
1320 "do work",
1321 );
1322 let step = runtime.step(KernelInput::new(KernelInputEvent::SpawnSubAgent {
1323 spec,
1324 parent_session_id: "parent-session".to_string(),
1325 }));
1326
1327 assert!(step.actions.is_empty());
1328 assert!(step.observations.iter().any(|o| matches!(
1329 o,
1330 KernelObservation::AgentProcessChanged {
1331 agent_id,
1332 parent_session_id,
1333 state,
1334 ..
1335 } if agent_id == "worker" && parent_session_id == "parent-session" && state == "running"
1336 )));
1337 assert_eq!(
1338 runtime
1339 .state_machine()
1340 .agent_process("worker")
1341 .expect("process")
1342 .parent_session_id
1343 .as_str(),
1344 "parent-session"
1345 );
1346 assert!(step.observations.iter().any(|o| matches!(
1347 o,
1348 KernelObservation::Suspended { reason, .. } if reason == "sub_agent_await"
1349 )));
1350 assert!(runtime.state_machine().is_suspended());
1351 assert!(matches!(
1352 runtime.state_machine().wait_reason(),
1353 Some(crate::scheduler::tcb::WaitReason::SubAgentJoin(_))
1354 ));
1355 }
1356
1357 #[test]
1358 fn set_resource_quota_input_denies_spawn_over_quota() {
1359 use crate::governance::quota::ResourceQuota;
1360 use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
1361
1362 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1363 let step = runtime.step(KernelInput::new(KernelInputEvent::SetResourceQuota {
1365 quota: ResourceQuota { max_spawn_depth: Some(0), ..ResourceQuota::default() },
1366 }));
1367 assert!(step.actions.is_empty(), "config input yields no actions");
1368
1369 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1370 task: RuntimeTask::new("parent task"),
1371 run_spec: None,
1372 }));
1373 runtime.state_machine_mut().take_observations();
1374
1375 let spec = AgentRunSpec::new(
1376 AgentIdentity::sub_agent("worker", "worker-session"),
1377 AgentRole::Implement,
1378 "do work",
1379 );
1380 let step = runtime.step(KernelInput::new(KernelInputEvent::SpawnSubAgent {
1381 spec,
1382 parent_session_id: "parent-session".to_string(),
1383 }));
1384
1385 assert!(matches!(
1388 step.actions.as_slice(),
1389 [KernelAction::CallProvider { .. }]
1390 ));
1391 assert!(!step.observations.iter().any(|o| matches!(
1392 o,
1393 KernelObservation::AgentProcessChanged { agent_id, .. } if agent_id == "worker"
1394 )));
1395 assert!(runtime.state_machine().agent_process("worker").is_none());
1396 assert!(!runtime.state_machine().is_suspended());
1397 }
1398
1399 #[test]
1400 fn default_runtime_leaves_spawn_unquota_ed() {
1401 use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
1402
1403 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1405 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1406 task: RuntimeTask::new("parent task"),
1407 run_spec: None,
1408 }));
1409 runtime.state_machine_mut().take_observations();
1410
1411 let spec = AgentRunSpec::new(
1412 AgentIdentity::sub_agent("worker", "worker-session"),
1413 AgentRole::Implement,
1414 "do work",
1415 );
1416 runtime.step(KernelInput::new(KernelInputEvent::SpawnSubAgent {
1417 spec,
1418 parent_session_id: "parent-session".to_string(),
1419 }));
1420 assert!(runtime.state_machine().agent_process("worker").is_some());
1421 assert!(runtime.state_machine().is_suspended());
1422 }
1423
1424 #[test]
1429 fn agent_process_changed_locks_multiword_wire_form() {
1430 use crate::types::agent::{AgentIdentity, AgentIsolation, AgentRole, AgentRunSpec};
1431
1432 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1433 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1434 task: RuntimeTask::new("parent task"),
1435 run_spec: None,
1436 }));
1437 runtime.state_machine_mut().take_observations();
1438
1439 let spec = AgentRunSpec::new(
1441 AgentIdentity::sub_agent("worker", "worker-session"),
1442 AgentRole::Verify,
1443 "do work",
1444 )
1445 .with_isolation(AgentIsolation::ReadOnly);
1446 let step = runtime.step(KernelInput::new(KernelInputEvent::SpawnSubAgent {
1447 spec,
1448 parent_session_id: "parent-session".to_string(),
1449 }));
1450
1451 let obs = step
1452 .observations
1453 .iter()
1454 .find(|o| matches!(o, KernelObservation::AgentProcessChanged { .. }))
1455 .expect("agent_process_changed observation");
1456 let json = serde_json::to_value(obs).unwrap();
1457 assert_eq!(json["isolation"], "readonly", "isolation must stay debug-lowercase");
1458 assert_eq!(
1459 json["context_inheritance"], "systemonly",
1460 "context_inheritance must stay debug-lowercase"
1461 );
1462 assert_eq!(json["role"], "verify");
1463 assert_eq!(json["state"], "running");
1464 }
1465
1466 fn write_memory(runtime: &mut KernelRuntime, name: &str, content: &str) -> KernelStep {
1469 use crate::mm::memory::{MemoryMetadata, MemoryWriteRequest};
1470 runtime.step(KernelInput::new(KernelInputEvent::WriteMemory {
1471 memory: MemoryWriteRequest {
1472 metadata: MemoryMetadata {
1473 name: name.to_string(),
1474 description: "desc".to_string(),
1475 ..Default::default()
1476 },
1477 content: content.to_string(),
1478 },
1479 }))
1480 }
1481
1482 #[test]
1483 fn memory_policy_validation_disabled_admits_forbidden_write() {
1484 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1486 runtime.step(KernelInput::new(KernelInputEvent::SetMemoryPolicy {
1487 memory_path: String::new(),
1488 stale_warning_days: 2,
1489 retrieval_top_k: 5,
1490 validation_enabled: false,
1491 max_content_bytes: None,
1492 max_name_length: None,
1493 }));
1494 let step = write_memory(&mut runtime, "note", "代码模式: foo");
1495 assert!(step
1496 .observations
1497 .iter()
1498 .any(|o| matches!(o, KernelObservation::MemoryWritten { .. })));
1499 assert!(!step
1500 .observations
1501 .iter()
1502 .any(|o| matches!(o, KernelObservation::MemoryValidationFailed { .. })));
1503 }
1504
1505 #[test]
1506 fn default_runtime_validates_forbidden_write() {
1507 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1509 let step = write_memory(&mut runtime, "note", "代码模式: foo");
1510 assert!(step
1511 .observations
1512 .iter()
1513 .any(|o| matches!(o, KernelObservation::MemoryValidationFailed { .. })));
1514 }
1515
1516 #[test]
1517 fn memory_policy_size_override_rejects_oversized_write() {
1518 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1519 runtime.step(KernelInput::new(KernelInputEvent::SetMemoryPolicy {
1520 memory_path: String::new(),
1521 stale_warning_days: 2,
1522 retrieval_top_k: 5,
1523 validation_enabled: true,
1524 max_content_bytes: Some(8),
1525 max_name_length: None,
1526 }));
1527 let step = write_memory(&mut runtime, "note", "this content is well over eight bytes");
1528 let failed = step.observations.iter().find_map(|o| match o {
1529 KernelObservation::MemoryValidationFailed { error, .. } => Some(error.clone()),
1530 _ => None,
1531 });
1532 assert!(failed.is_some_and(|e| e.contains("too large")));
1533 }
1534
1535 #[test]
1536 fn memory_policy_clamps_retrieval_top_k() {
1537 use crate::mm::memory::MemoryQuery;
1538 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1539 runtime.step(KernelInput::new(KernelInputEvent::SetMemoryPolicy {
1540 memory_path: String::new(),
1541 stale_warning_days: 2,
1542 retrieval_top_k: 3,
1543 validation_enabled: true,
1544 max_content_bytes: None,
1545 max_name_length: None,
1546 }));
1547 let step = runtime.step(KernelInput::new(KernelInputEvent::QueryMemory {
1548 query: MemoryQuery { top_k: 50, ..Default::default() },
1549 }));
1550 let requested = step.observations.iter().find_map(|o| match o {
1551 KernelObservation::MemoryQueried { requested_k, .. } => Some(*requested_k),
1552 _ => None,
1553 });
1554 assert_eq!(requested, Some(3));
1555 }
1556
1557 #[test]
1558 fn default_runtime_uses_requested_top_k_verbatim() {
1559 use crate::mm::memory::MemoryQuery;
1560 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1561 let step = runtime.step(KernelInput::new(KernelInputEvent::QueryMemory {
1562 query: MemoryQuery { top_k: 50, ..Default::default() },
1563 }));
1564 let requested = step.observations.iter().find_map(|o| match o {
1565 KernelObservation::MemoryQueried { requested_k, .. } => Some(*requested_k),
1566 _ => None,
1567 });
1568 assert_eq!(requested, Some(50));
1569 }
1570
1571 #[test]
1572 fn provider_result_now_ms_drives_wall_time_budget() {
1573 let mut runtime = KernelRuntime::new(LoopPolicy {
1574 max_wall_ms: Some(10),
1575 ..LoopPolicy::default()
1576 });
1577 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1578 task: RuntimeTask::new("ship it"),
1579 run_spec: None,
1580 }));
1581 let mut msg = Message::assistant("");
1582 msg.tool_calls.push(ToolCall {
1583 id: "call-1".into(),
1584 name: "echo".into(),
1585 arguments: serde_json::json!({}),
1586 });
1587 runtime.step(KernelInput::new(KernelInputEvent::ProviderResult {
1588 message: msg,
1589 observed_input_tokens: None,
1590 observed_output_tokens: None,
1591 now_ms: Some(100),
1592 }));
1593 let step = runtime.step(KernelInput::new(KernelInputEvent::ToolResults {
1594 results: vec![ToolResult {
1595 call_id: "call-1".into(),
1596 output: crate::types::message::Content::Text("ok".into()),
1597 is_error: false,
1598 is_fatal: false,
1599 error_kind: None,
1600 token_count: None,
1601 }],
1602 }));
1603
1604 assert!(matches!(
1605 step.actions.as_slice(),
1606 [KernelAction::CallProvider { tools, .. }] if tools.is_empty()
1607 ));
1608 }
1609
1610 fn assistant_calling(tool: &str) -> Message {
1613 let mut msg = Message::assistant("");
1614 msg.tool_calls.push(ToolCall {
1615 id: "call-1".into(),
1616 name: tool.into(),
1617 arguments: serde_json::json!({}),
1618 });
1619 msg
1620 }
1621
1622 fn run_with_tool_call(runtime: &mut KernelRuntime, tool: &str) -> KernelStep {
1624 run_with_tool_call_named(runtime, tool, "call-1")
1625 }
1626
1627 fn run_with_tool_call_named(
1628 runtime: &mut KernelRuntime,
1629 tool: &str,
1630 call_id: &str,
1631 ) -> KernelStep {
1632 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1633 task: RuntimeTask::new("do the thing"),
1634 run_spec: None,
1635 }));
1636 runtime.state_machine_mut().take_observations();
1637 runtime.step(KernelInput::new(KernelInputEvent::ProviderResult {
1638 message: assistant_calling(tool),
1639 observed_input_tokens: None,
1640 observed_output_tokens: None,
1641 now_ms: None,
1642 }))
1643 }
1644
1645 #[test]
1646 fn governance_deny_blocks_tool_and_reprompts() {
1647 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1648 runtime.step(KernelInput::new(KernelInputEvent::LoadGovernancePolicy {
1649 default_action: Some(PolicyAction::Allow),
1650 rules: vec![PolicyRule {
1651 tool_pattern: "danger.*".to_string(),
1652 action: PolicyAction::Deny,
1653 }],
1654 vetoed_tools: vec![],
1655 rate_limits: vec![],
1656 constraints: vec![],
1657 }));
1658
1659 let step = run_with_tool_call(&mut runtime, "danger.delete");
1660
1661 assert!(
1663 matches!(step.actions.as_slice(), [KernelAction::CallProvider { .. }]),
1664 "denied tool should roll back and re-call provider, got {:?}",
1665 step.actions
1666 );
1667 assert!(
1668 step.observations
1669 .iter()
1670 .any(|o| matches!(o, KernelObservation::Rollbacked { .. })),
1671 "expected a Rollbacked observation for the denied turn",
1672 );
1673 }
1674
1675 #[test]
1676 fn configure_run_bundle_applies_governance_equivalently_to_load_governance_policy() {
1677 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1680 runtime.step(KernelInput::new(KernelInputEvent::ConfigureRun {
1681 config: RunConfig {
1682 tools: Some(vec![]),
1683 governance: Some(GovernanceConfig {
1684 default_action: Some(PolicyAction::Allow),
1685 rules: vec![PolicyRule {
1686 tool_pattern: "danger.*".to_string(),
1687 action: PolicyAction::Deny,
1688 }],
1689 ..GovernanceConfig::default()
1690 }),
1691 attention_max_queue_size: Some(32),
1692 ..RunConfig::default()
1693 },
1694 }));
1695
1696 let step = run_with_tool_call(&mut runtime, "danger.delete");
1697
1698 assert!(
1699 matches!(step.actions.as_slice(), [KernelAction::CallProvider { .. }]),
1700 "bundle-configured deny should roll back and re-call provider, got {:?}",
1701 step.actions
1702 );
1703 assert!(
1704 step.observations
1705 .iter()
1706 .any(|o| matches!(o, KernelObservation::Rollbacked { .. })),
1707 "expected a Rollbacked observation for the bundle-denied turn",
1708 );
1709 }
1710
1711 #[test]
1712 fn configure_run_round_trips_over_the_abi() {
1713 let event = KernelInputEvent::ConfigureRun {
1716 config: RunConfig {
1717 resource_quota: Some(crate::governance::quota::ResourceQuota {
1718 max_concurrent_subagents: Some(2),
1719 ..Default::default()
1720 }),
1721 scheduler_max_wall_ms: Some(60_000),
1722 plan_tool_enabled: Some(true),
1723 ..RunConfig::default()
1724 },
1725 };
1726 let json = serde_json::to_string(&event).expect("serialize");
1727 let parsed: KernelInputEvent = serde_json::from_str(&json).expect("deserialize");
1728 assert!(matches!(parsed, KernelInputEvent::ConfigureRun { .. }));
1729 }
1730
1731 #[test]
1732 fn governance_ask_user_suspends_until_resume() {
1733 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1734 runtime.step(KernelInput::new(KernelInputEvent::LoadGovernancePolicy {
1735 default_action: Some(PolicyAction::Allow),
1736 rules: vec![PolicyRule {
1737 tool_pattern: "sensitive.*".to_string(),
1738 action: PolicyAction::AskUser,
1739 }],
1740 vetoed_tools: vec![],
1741 rate_limits: vec![],
1742 constraints: vec![],
1743 }));
1744
1745 let step = run_with_tool_call(&mut runtime, "sensitive.read");
1746
1747 assert!(
1748 step.actions.is_empty(),
1749 "AskUser should suspend without ExecuteTool, got {:?}",
1750 step.actions
1751 );
1752 assert!(
1753 step.observations.iter().any(|o| matches!(
1754 o,
1755 KernelObservation::ToolGated { tool, .. } if tool == "sensitive.read"
1756 )),
1757 "expected a ToolGated observation for the AskUser call",
1758 );
1759 assert!(
1760 step.observations.iter().any(|o| matches!(
1761 o,
1762 KernelObservation::Suspended { reason, .. } if reason == "ask_user"
1763 )),
1764 "expected a Suspended observation",
1765 );
1766
1767 let resumed = runtime.step(KernelInput::new(KernelInputEvent::Resume {
1768 approved_calls: vec!["call-1".to_string()],
1769 denied_calls: vec![],
1770 }));
1771 assert!(
1772 matches!(resumed.actions.as_slice(), [KernelAction::ExecuteTool { .. }]),
1773 "resume with approval should emit ExecuteTool, got {:?}",
1774 resumed.actions
1775 );
1776 assert!(
1777 resumed.observations.iter().any(|o| matches!(
1778 o,
1779 KernelObservation::Resumed { approved, denied, .. }
1780 if approved == &["call-1"] && denied.is_empty()
1781 )),
1782 );
1783 }
1784
1785 #[test]
1786 fn governance_ask_user_resume_all_denied_feeds_tool_results() {
1787 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1788 runtime.step(KernelInput::new(KernelInputEvent::LoadGovernancePolicy {
1789 default_action: Some(PolicyAction::Allow),
1790 rules: vec![PolicyRule {
1791 tool_pattern: "sensitive.*".to_string(),
1792 action: PolicyAction::AskUser,
1793 }],
1794 vetoed_tools: vec![],
1795 rate_limits: vec![],
1796 constraints: vec![],
1797 }));
1798 run_with_tool_call(&mut runtime, "sensitive.read");
1799 runtime.state_machine_mut().take_observations();
1800
1801 let step = runtime.step(KernelInput::new(KernelInputEvent::Resume {
1802 approved_calls: vec![],
1803 denied_calls: vec!["call-1".to_string()],
1804 }));
1805 assert!(
1806 matches!(step.actions.as_slice(), [KernelAction::CallProvider { .. }]),
1807 "all denied should re-prompt provider, got {:?}",
1808 step.actions
1809 );
1810 }
1811
1812 #[test]
1813 fn no_governance_policy_executes_all_tools() {
1814 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1815 let step = run_with_tool_call(&mut runtime, "danger.delete");
1816
1817 assert!(matches!(
1819 step.actions.as_slice(),
1820 [KernelAction::ExecuteTool { .. }]
1821 ));
1822 assert!(
1823 !step
1824 .observations
1825 .iter()
1826 .any(|o| matches!(o, KernelObservation::ToolGated { .. })),
1827 );
1828 }
1829
1830 fn tool_ok(call_id: &str) -> ToolResult {
1831 ToolResult {
1832 call_id: call_id.into(),
1833 output: crate::types::message::Content::Text("ok".to_string()),
1834 is_error: false,
1835 is_fatal: false,
1836 error_kind: None,
1837 token_count: None,
1838 }
1839 }
1840
1841 #[test]
1842 fn governance_rate_limit_blocks_second_call() {
1843 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1844 runtime.step(KernelInput::new(KernelInputEvent::LoadGovernancePolicy {
1845 default_action: Some(PolicyAction::Allow),
1846 rules: vec![],
1847 vetoed_tools: vec![],
1848 rate_limits: vec![RateLimitSpec {
1849 tool: "fetch".to_string(),
1850 max_calls: 1,
1851 window_ms: 60_000,
1852 }],
1853 constraints: vec![],
1854 }));
1855 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1856 task: RuntimeTask::new("fetch twice"),
1857 run_spec: None,
1858 }));
1859 runtime.state_machine_mut().take_observations();
1860
1861 let s1 = runtime.step(KernelInput::new(KernelInputEvent::ProviderResult {
1863 message: assistant_calling("fetch"),
1864 observed_input_tokens: None,
1865 observed_output_tokens: None,
1866 now_ms: Some(1_000),
1867 }));
1868 assert!(
1869 matches!(s1.actions.as_slice(), [KernelAction::ExecuteTool { .. }]),
1870 "first call should execute, got {:?}",
1871 s1.actions
1872 );
1873
1874 runtime.step(KernelInput::new(KernelInputEvent::ToolResults {
1876 results: vec![tool_ok("call-1")],
1877 }));
1878 runtime.state_machine_mut().take_observations();
1879
1880 let s2 = runtime.step(KernelInput::new(KernelInputEvent::ProviderResult {
1882 message: assistant_calling("fetch"),
1883 observed_input_tokens: None,
1884 observed_output_tokens: None,
1885 now_ms: Some(1_001),
1886 }));
1887 assert!(
1888 matches!(s2.actions.as_slice(), [KernelAction::CallProvider { .. }]),
1889 "rate-limited call should roll back and re-call provider, got {:?}",
1890 s2.actions
1891 );
1892 assert!(
1893 s2.observations
1894 .iter()
1895 .any(|o| matches!(o, KernelObservation::Rollbacked { .. })),
1896 "expected a Rollbacked observation for the rate-limited turn",
1897 );
1898 }
1899
1900 #[test]
1901 fn governance_constraint_required_param_denies() {
1902 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1903 runtime.step(KernelInput::new(KernelInputEvent::LoadGovernancePolicy {
1904 default_action: Some(PolicyAction::Allow),
1905 rules: vec![],
1906 vetoed_tools: vec![],
1907 rate_limits: vec![],
1908 constraints: vec![ConstraintSpec::Required {
1909 tool: "write".to_string(),
1910 path: "path".to_string(),
1911 }],
1912 }));
1913
1914 let step = run_with_tool_call(&mut runtime, "write");
1916 assert!(
1917 matches!(step.actions.as_slice(), [KernelAction::CallProvider { .. }]),
1918 "missing required param should roll back, got {:?}",
1919 step.actions
1920 );
1921 assert!(
1922 step.observations
1923 .iter()
1924 .any(|o| matches!(o, KernelObservation::Rollbacked { .. })),
1925 "expected a Rollbacked observation for the constraint violation",
1926 );
1927 }
1928
1929 fn signal(urgency: crate::types::signal::Urgency, summary: &str) -> crate::types::signal::RuntimeSignal {
1932 use crate::types::signal::{RuntimeSignal, SignalSource, SignalType};
1933 RuntimeSignal::new(SignalSource::Gateway, SignalType::Alert, urgency, summary)
1934 }
1935
1936 fn started_runtime_with_attention(max_queue: u32) -> KernelRuntime {
1937 let mut runtime = KernelRuntime::new(LoopPolicy::default());
1938 runtime.step(KernelInput::new(KernelInputEvent::SetAttentionPolicy {
1939 max_queue_size: max_queue,
1940 }));
1941 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
1942 task: RuntimeTask::new("watch for signals"),
1943 run_spec: None,
1944 }));
1945 runtime.state_machine_mut().take_observations();
1946 runtime
1947 }
1948
1949 #[test]
1950 fn attention_policy_critical_signal_interrupts() {
1951 use crate::types::signal::Urgency;
1952 let mut runtime = started_runtime_with_attention(8);
1953 let step = runtime.step(KernelInput::new(KernelInputEvent::Signal {
1954 signal: signal(Urgency::Critical, "fire"),
1955 }));
1956 assert!(
1957 matches!(step.actions.as_slice(), [KernelAction::CallProvider { .. }]),
1958 "critical signal should drive a provider call, got {:?}",
1959 step.actions
1960 );
1961 assert!(step.observations.iter().any(|o| matches!(
1962 o,
1963 KernelObservation::SignalDisposed { disposition, .. } if disposition == "interrupt_now"
1964 )));
1965 }
1966
1967 #[test]
1968 fn attention_policy_normal_signal_queues_without_action() {
1969 use crate::types::signal::Urgency;
1970 let mut runtime = started_runtime_with_attention(8);
1971 let step = runtime.step(KernelInput::new(KernelInputEvent::Signal {
1972 signal: signal(Urgency::Normal, "job"),
1973 }));
1974 assert!(
1975 step.actions.is_empty(),
1976 "normal signal should queue without a provider call, got {:?}",
1977 step.actions
1978 );
1979 assert!(step.observations.iter().any(|o| matches!(
1980 o,
1981 KernelObservation::SignalDisposed { disposition, queue_depth, .. }
1982 if disposition == "queue" && *queue_depth == 1
1983 )));
1984 }
1985
1986 #[test]
1987 fn attention_policy_full_queue_drops() {
1988 use crate::types::signal::Urgency;
1989 let mut runtime = started_runtime_with_attention(1);
1990 runtime.step(KernelInput::new(KernelInputEvent::Signal {
1991 signal: signal(Urgency::Normal, "first"),
1992 }));
1993 let step = runtime.step(KernelInput::new(KernelInputEvent::Signal {
1994 signal: signal(Urgency::Normal, "second"),
1995 }));
1996 assert!(step.observations.iter().any(|o| matches!(
1997 o,
1998 KernelObservation::SignalDisposed { disposition, .. } if disposition == "dropped"
1999 )));
2000 }
2001
2002 #[test]
2003 #[test]
2004 fn page_in_populates_knowledge_partition() {
2005 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2006 runtime.step(KernelInput::new(KernelInputEvent::SetMemoryEnabled {
2007 enabled: true,
2008 }));
2009 let before = runtime
2010 .state_machine()
2011 .ctx
2012 .partitions
2013 .knowledge
2014 .messages
2015 .len();
2016 runtime.step(KernelInput::new(KernelInputEvent::PageIn {
2017 entries: vec![crate::mm::PageInEntry {
2018 content: "[memory] prior fix".to_string(),
2019 tokens: Some(10),
2020 source: Some("memory".to_string()),
2021 }],
2022 }));
2023 let after = runtime
2024 .state_machine()
2025 .ctx
2026 .partitions
2027 .knowledge
2028 .messages
2029 .len();
2030 assert!(after > before, "page-in should add knowledge messages");
2031 }
2032
2033 #[test]
2034 fn memory_tool_emits_page_in_requested() {
2035 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2036 runtime.step(KernelInput::new(KernelInputEvent::SetMemoryEnabled {
2037 enabled: true,
2038 }));
2039 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
2040 task: RuntimeTask::new("test"),
2041 run_spec: None,
2042 }));
2043 runtime.state_machine_mut().take_observations();
2044
2045 let step = run_with_tool_call(&mut runtime, "memory");
2046 assert!(step.observations.iter().any(|o| matches!(
2047 o,
2048 KernelObservation::PageInRequested { tool, .. } if tool == "memory"
2049 )));
2050 }
2051
2052 #[test]
2053 fn load_workflow_input_drives_dag_to_completion() {
2054 use crate::orchestration::workflow::fanout_synthesize;
2055 use crate::types::result::{LoopResult, SubAgentResult, TerminationReason};
2056
2057 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2058 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
2059 task: RuntimeTask::new("parent task"),
2060 run_spec: None,
2061 }));
2062 runtime.state_machine_mut().take_observations();
2063
2064 let spec =
2066 fanout_synthesize(vec![RuntimeTask::new("w0"), RuntimeTask::new("w1")], RuntimeTask::new("synth"));
2067 let event = KernelInputEvent::LoadWorkflow {
2068 spec,
2069 parent_session_id: "sess".to_string(),
2070 resumed_completed: Vec::new(),
2071 resumed_submissions: Vec::new(),
2072 };
2073 let json = serde_json::to_string(&event).expect("serialize");
2074 let parsed: KernelInputEvent = serde_json::from_str(&json).expect("deserialize");
2075
2076 let step = runtime.step(KernelInput::new(parsed));
2077 let batch = step
2079 .observations
2080 .iter()
2081 .find_map(|o| match o {
2082 KernelObservation::WorkflowBatchSpawned { nodes, .. } => Some(nodes.clone()),
2083 _ => None,
2084 })
2085 .expect("workflow_batch_spawned");
2086 assert_eq!(batch.len(), 2);
2087 let goals: Vec<&str> = batch.iter().map(|n| n.goal.as_str()).collect();
2088 assert!(goals.contains(&"w0") && goals.contains(&"w1"));
2089 assert_eq!(batch[0].agent_id, "wf-node0");
2090 assert_eq!(batch[0].isolation, "read_only"); let complete = |runtime: &mut KernelRuntime, id: &str| {
2093 runtime.step(KernelInput::new(KernelInputEvent::SubAgentCompleted {
2094 result: SubAgentResult {
2095 agent_id: compact_str::CompactString::new(id),
2096 result: LoopResult {
2097 termination: TerminationReason::Completed,
2098 final_message: None,
2099 turns_used: 1,
2100 total_tokens_used: 1,
2101 loop_continue: None,
2102 classify_branch: None,
2103 tournament_winner: None,
2104 },
2105 },
2106 }))
2107 };
2108
2109 complete(&mut runtime, "wf-node0");
2110 let step = complete(&mut runtime, "wf-node1");
2112 assert!(step.observations.iter().any(|o| matches!(
2113 o,
2114 KernelObservation::WorkflowBatchSpawned { nodes, .. }
2115 if nodes.len() == 1 && nodes[0].agent_id == "wf-node2"
2116 )));
2117
2118 let step = complete(&mut runtime, "wf-node2");
2120 assert!(step.observations.iter().any(|o| matches!(
2121 o,
2122 KernelObservation::WorkflowCompleted { completed, .. } if completed.len() == 3
2123 )));
2124 }
2125
2126 #[test]
2127 fn load_workflow_self_bootstraps_with_no_prior_start_run() {
2128 use crate::orchestration::workflow::fanout_synthesize;
2132 use crate::types::result::{LoopResult, SubAgentResult, TerminationReason};
2133
2134 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2135 let spec =
2138 fanout_synthesize(vec![RuntimeTask::new("w0"), RuntimeTask::new("w1")], RuntimeTask::new("synth"));
2139 let step = runtime.step(KernelInput::new(KernelInputEvent::LoadWorkflow {
2140 spec,
2141 parent_session_id: "sess".to_string(),
2142 resumed_completed: Vec::new(),
2143 resumed_submissions: Vec::new(),
2144 }));
2145 let batch = step
2146 .observations
2147 .iter()
2148 .find_map(|o| match o {
2149 KernelObservation::WorkflowBatchSpawned { nodes, .. } => Some(nodes.clone()),
2150 _ => None,
2151 })
2152 .expect("workflow_batch_spawned even without a prior StartRun");
2153 assert_eq!(batch.len(), 2);
2154
2155 let complete = |runtime: &mut KernelRuntime, id: &str| {
2156 runtime.step(KernelInput::new(KernelInputEvent::SubAgentCompleted {
2157 result: SubAgentResult {
2158 agent_id: compact_str::CompactString::new(id),
2159 result: LoopResult {
2160 termination: TerminationReason::Completed,
2161 final_message: None,
2162 turns_used: 1,
2163 total_tokens_used: 1,
2164 loop_continue: None,
2165 classify_branch: None,
2166 tournament_winner: None,
2167 },
2168 },
2169 }))
2170 };
2171 complete(&mut runtime, "wf-node0");
2172 complete(&mut runtime, "wf-node1");
2173 let step = complete(&mut runtime, "wf-node2");
2174 assert!(step.observations.iter().any(|o| matches!(
2175 o,
2176 KernelObservation::WorkflowCompleted { completed, .. } if completed.len() == 3
2177 )));
2178 }
2179
2180 #[test]
2181 fn submit_workflow_nodes_input_appends_a_node_over_the_abi() {
2182 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
2185 use crate::types::agent::AgentRole;
2186 use crate::types::result::{LoopResult, SubAgentResult, TerminationReason};
2187
2188 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2189 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
2190 task: RuntimeTask::new("parent task"),
2191 run_spec: None,
2192 }));
2193 runtime.state_machine_mut().take_observations();
2194
2195 let spec = WorkflowSpec::new(vec![WorkflowNode::new(
2197 RuntimeTask::new("root"),
2198 AgentRole::Implement,
2199 )]);
2200 runtime.step(KernelInput::new(KernelInputEvent::LoadWorkflow {
2201 spec,
2202 parent_session_id: "sess".to_string(),
2203 resumed_completed: Vec::new(),
2204 resumed_submissions: Vec::new(),
2205 }));
2206 runtime.state_machine_mut().take_observations();
2207
2208 let event = KernelInputEvent::SubmitWorkflowNodes {
2210 nodes: vec![WorkflowNode::new(RuntimeTask::new("more"), AgentRole::Implement)],
2211 submitter_agent_id: None,
2212 };
2213 let json = serde_json::to_string(&event).expect("serialize");
2214 let parsed: KernelInputEvent = serde_json::from_str(&json).expect("deserialize");
2215 let step = runtime.step(KernelInput::new(parsed));
2216 assert!(step.observations.iter().any(|o| matches!(
2218 o,
2219 KernelObservation::WorkflowBatchSpawned { nodes, .. }
2220 if nodes.len() == 1 && nodes[0].agent_id == "wf-node1" && nodes[0].goal == "more"
2221 )));
2222
2223 let complete = |runtime: &mut KernelRuntime, id: &str| {
2224 runtime.step(KernelInput::new(KernelInputEvent::SubAgentCompleted {
2225 result: SubAgentResult {
2226 agent_id: compact_str::CompactString::new(id),
2227 result: LoopResult {
2228 termination: TerminationReason::Completed,
2229 final_message: None,
2230 turns_used: 1,
2231 total_tokens_used: 1,
2232 loop_continue: None,
2233 classify_branch: None,
2234 tournament_winner: None,
2235 },
2236 },
2237 }))
2238 };
2239 complete(&mut runtime, "wf-node0");
2240 let step = complete(&mut runtime, "wf-node1");
2242 assert!(step.observations.iter().any(|o| matches!(
2243 o,
2244 KernelObservation::WorkflowCompleted { completed, .. } if completed.len() == 2
2245 )));
2246 }
2247
2248 #[test]
2249 fn submit_workflow_input_bootstraps_a_dag_over_the_abi() {
2250 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
2253 use crate::types::agent::AgentRole;
2254 use crate::types::result::{LoopResult, SubAgentResult, TerminationReason};
2255
2256 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2257 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
2258 task: RuntimeTask::new("parent task"),
2259 run_spec: None,
2260 }));
2261 runtime.state_machine_mut().take_observations();
2262
2263 let spec = WorkflowSpec::new(vec![WorkflowNode::new(
2265 RuntimeTask::new("authored root"),
2266 AgentRole::Implement,
2267 )]);
2268 let event = KernelInputEvent::SubmitWorkflow {
2269 spec,
2270 parent_session_id: "sess".to_string(),
2271 submitter_agent_id: None,
2272 };
2273 let json = serde_json::to_string(&event).expect("serialize");
2274 let parsed: KernelInputEvent = serde_json::from_str(&json).expect("deserialize");
2275 let step = runtime.step(KernelInput::new(parsed));
2276 assert!(step.observations.iter().any(|o| matches!(
2278 o,
2279 KernelObservation::WorkflowBatchSpawned { nodes, .. }
2280 if nodes.len() == 1 && nodes[0].agent_id == "wf-node0" && nodes[0].goal == "authored root"
2281 )));
2282
2283 let step = runtime.step(KernelInput::new(KernelInputEvent::SubAgentCompleted {
2284 result: SubAgentResult {
2285 agent_id: compact_str::CompactString::new("wf-node0"),
2286 result: LoopResult {
2287 termination: TerminationReason::Completed,
2288 final_message: None,
2289 turns_used: 1,
2290 total_tokens_used: 1,
2291 loop_continue: None,
2292 classify_branch: None,
2293 tournament_winner: None,
2294 },
2295 },
2296 }));
2297 assert!(step.observations.iter().any(|o| matches!(
2298 o,
2299 KernelObservation::WorkflowCompleted { completed, .. } if completed.len() == 1
2300 )));
2301 }
2302
2303 #[test]
2304 fn load_workflow_resumes_from_completed_nodes() {
2305 use crate::orchestration::workflow::fanout_synthesize;
2306
2307 let mut runtime = KernelRuntime::new(LoopPolicy::default());
2308 runtime.step(KernelInput::new(KernelInputEvent::StartRun {
2309 task: RuntimeTask::new("parent task"),
2310 run_spec: None,
2311 }));
2312 runtime.state_machine_mut().take_observations();
2313
2314 let spec =
2316 fanout_synthesize(vec![RuntimeTask::new("w0"), RuntimeTask::new("w1")], RuntimeTask::new("synth"));
2317 let step = runtime.step(KernelInput::new(KernelInputEvent::LoadWorkflow {
2318 spec,
2319 parent_session_id: "sess".to_string(),
2320 resumed_completed: vec!["wf-node0".to_string()],
2321 resumed_submissions: Vec::new(),
2322 }));
2323
2324 let batch = step
2326 .observations
2327 .iter()
2328 .find_map(|o| match o {
2329 KernelObservation::WorkflowBatchSpawned { nodes, .. } => Some(nodes.clone()),
2330 _ => None,
2331 })
2332 .expect("workflow_batch_spawned");
2333 assert_eq!(batch.len(), 1);
2334 assert_eq!(batch[0].agent_id, "wf-node1");
2335 }
2336}