1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
//! Sub-agent process lifecycle impl for [`super::LoopStateMachine`].
use super::{KernelObservation, LoopAction, LoopPhase, LoopStateMachine, SuspendState};
use super::super::tcb::{TaskState, TaskTable, Tcb, WaitReason};
use crate::proc::AgentProcess;
use crate::runtime::session::RollbackReason;
use crate::syscall::{Disposition, Syscall};
use crate::types::message::Message;
use crate::types::result::{SubAgentResult, TerminationReason};
use crate::AgentRunSpec;
impl LoopStateMachine {
/// Spawn a sub-agent: registers a kernel process, emits `AgentProcessChanged`,
/// and enters `Suspended(SubAgentAwait)` until the SDK feeds `SubAgentCompleted`.
pub fn spawn_sub_agent(
&mut self,
spec: AgentRunSpec,
parent_session_id: &str,
) -> LoopAction {
let manifest = crate::types::agent::IsolationManifest::from_spec(
&spec,
parent_session_id,
&self.ctx.capabilities,
);
// M2b: spawning is an effectful request — route it through the same syscall trap as tool
// calls. No spawn policy stages exist yet, so this defaults to `Allow`; a `Deny` rolls the
// turn back exactly like a denied tool call. Establishing the chokepoint now means quotas /
// spawn rules can attach later without a new code path.
if let Disposition::Deny { reason, .. } =
self.evaluate_syscall(&Syscall::Spawn(manifest.clone()))
{
let rb = RollbackReason::GovernanceDenied {
tool_name: format!("spawn:{}", manifest.agent_id),
reason,
};
let note = Message::user(super::super::rollback::build_rollback_note(
&rb,
self.ctx.config.verbose_control_notes,
));
self.rollback(rb);
self.ctx
.push_signal(note.content.as_text().unwrap_or_default().to_string());
self.phase = LoopPhase::Reason;
return self.emit_call_llm();
}
let agent_id = manifest.agent_id.to_string();
// M1 収口: register the sub-agent as a child task — the single source of truth. The
// `AgentProcess` view row is reconstructed from the TCB for the observation/session-log.
let child = Tcb::spawned(&manifest, self.policy.clone());
self.tasks.insert(child);
if let Some(process) = self
.tasks
.get(&agent_id)
.and_then(AgentProcess::from_tcb)
{
self.push_agent_process_changed(process);
}
self.suspend_state = Some(SuspendState::SubAgentAwait {
agent_ids: vec![agent_id.clone()],
});
self.set_lifecycle(
TaskState::Suspended,
Some(WaitReason::SubAgentJoin(vec![manifest.agent_id.clone()])),
);
self.observations.push(KernelObservation::Suspended {
turn: self.turn,
reason: "sub_agent_await".to_string(),
pending_calls: vec![agent_id],
});
LoopAction::AwaitingResume
}
pub(super) fn handle_sub_agent_completed(&mut self, result: SubAgentResult) -> LoopAction {
// M1 収口: record the join on the child task itself (the source of truth) — both the
// terminal lifecycle and the result payload — then rebuild the `AgentProcess` view row.
// The terminal `TaskState` preserves the legacy `ProcessState`→`TaskState` mapping
// (`Completed`→`Done(Completed)`, anything else→`Done(Error)`).
let process = if let Some(task) = self.tasks.get_mut(result.agent_id.as_str()) {
let process_state = match result.result.termination {
TerminationReason::Completed => crate::proc::ProcessState::Joined,
_ => crate::proc::ProcessState::Failed,
};
task.state = TaskState::from(process_state);
if let Some(info) = task.proc.as_mut() {
info.result = Some(result.clone());
}
AgentProcess::from_tcb(task)
} else {
None
};
if let Some(process) = process {
self.push_agent_process_changed(process);
}
let summary = result
.result
.final_message
.as_ref()
.and_then(|m| m.content.as_text())
.unwrap_or_default();
// R3-3 cross-boundary provenance: a quarantined node read untrusted content, so its output
// crossing into the trusted parent context is labeled as untrusted-origin. The kernel
// enforces the *label* (auditable, machine-checkable); shaping the output into a structured
// summary stays the SDK's job, since the kernel cannot inspect content shape.
let quarantined = self
.workflow
.as_ref()
.is_some_and(|w| w.is_agent_quarantined(result.agent_id.as_str()));
let marker = if quarantined { "quarantined sub-agent" } else { "sub-agent" };
self.ctx
.push_signal(format!("[{marker} {}] {}", result.agent_id, summary));
// W0: if a workflow owns this agent, advance its DAG (feed completion, drain the batch,
// spawn the next gated batch or finish) instead of the single-spawn barrier below.
if self
.workflow
.as_ref()
.is_some_and(|w| w.owns_agent(result.agent_id.as_str()))
{
return self.advance_workflow(result);
}
let agent_id = result.agent_id.to_string();
// Suspended awaiting a sub-agent join (lifecycle on the root task, M1d).
let awaiting_sub_agent =
self.is_suspended() && matches!(self.wait_reason(), Some(WaitReason::SubAgentJoin(_)));
let resume_parent = match self.suspend_state.as_mut() {
Some(SuspendState::SubAgentAwait { agent_ids }) if awaiting_sub_agent => {
agent_ids.retain(|id| id != &agent_id);
if agent_ids.is_empty() {
self.suspend_state = None;
self.observations.push(KernelObservation::Resumed {
turn: self.turn,
approved: vec![agent_id],
denied: Vec::new(),
});
true
} else {
false
}
}
_ => true,
};
if resume_parent {
self.phase = LoopPhase::Reason;
self.emit_call_llm()
} else {
LoopAction::AwaitingResume
}
}
/// The `AgentProcess` view of a sub-agent, reconstructed from its child task. `None` for the
/// root task or unknown ids. (M1 収口: derived from the `TaskTable`, no separate storage.)
pub fn agent_process(&self, agent_id: &str) -> Option<AgentProcess> {
self.tasks.get(agent_id).and_then(AgentProcess::from_tcb)
}
/// The `AgentProcess` view of all sub-agents (every child task with process identity).
pub fn agent_processes(&self) -> Vec<AgentProcess> {
self.tasks
.all()
.iter()
.filter_map(AgentProcess::from_tcb)
.collect()
}
/// The canonical task registry (root task + one row per sub-agent): the single source of
/// truth for schedulability *and* sub-agent lineage. `agent_process(es)` are derived views
/// over this table (M1 収口).
pub fn task_table(&self) -> &TaskTable {
&self.tasks
}
/// Emit an `AgentProcessChanged` observation for a process state transition.
pub(super) fn push_agent_process_changed(&mut self, process: AgentProcess) {
// Wire form: role/isolation/inheritance are debug-lowercase (`readonly`, `systemonly`),
// state via `label()`. Preserved verbatim from the former `From<LoopObservation>` so the
// observation merge stays byte-identical (locked by `agent_process_changed_locks_*` test).
self.observations.push(KernelObservation::AgentProcessChanged {
turn: self.turn,
agent_id: process.agent_id.to_string(),
parent_session_id: process.parent_session_id.to_string(),
role: format!("{:?}", process.role).to_lowercase(),
isolation: format!("{:?}", process.isolation).to_lowercase(),
context_inheritance: format!("{:?}", process.context_inheritance).to_lowercase(),
state: process.state.label().to_string(),
permitted_capability_ids: process
.permitted_capability_ids
.iter()
.map(|id| id.to_string())
.collect(),
result_termination: process.result_termination_label().map(str::to_string),
});
}
}