1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
//! Workflow orchestration impl for [`super::LoopStateMachine`].
use super::{KernelObservation, LoopAction, LoopPhase, LoopStateMachine, SuspendState};
use super::super::tcb::{TaskState, Tcb, WaitReason};
use crate::proc::AgentProcess;
use crate::runtime::session::RollbackReason;
use crate::syscall::{Disposition, Syscall};
use crate::types::message::Message;
use crate::types::result::SubAgentResult;
use crate::types::task::RuntimeTask;
impl LoopStateMachine {
/// Whether a workflow DAG is currently in flight.
pub fn workflow_active(&self) -> bool {
self.workflow.is_some()
}
/// K1: bring the host `LoadWorkflow` path to parity with the agent-reachable `SubmitWorkflow`,
/// which already bootstraps a run when none is active. If the root task is still `Ready` (the host
/// never fired `StartRun` — e.g. a stateless `runWorkflow` caller), run the same initialization
/// `StartRun` performs so the DAG installs onto a live run. No-op once the root has left `Ready`
/// (host started it, or we already bootstrapped), so it is safe to call unconditionally and carries
/// no behavior change for runs that were started normally. The `CallLLM` action `start` returns is
/// discarded — the workflow drives nodes, not a root provider turn, exactly as the SDK's previous
/// `start_run` + `load_workflow` sequence already did.
pub fn ensure_started_for_workflow(&mut self, spec: &crate::orchestration::workflow::WorkflowSpec) {
if !matches!(self.lifecycle(), TaskState::Ready) {
return;
}
let goal = format!("workflow:{} nodes", spec.nodes.len());
let _ = self.start(RuntimeTask::new(goal));
}
/// W0: load a workflow DAG and spawn its first gated batch. On an invalid spec (cycle /
/// out-of-range dependency) the workflow is not installed and the loop continues with a
/// rollback note, mirroring how a denied effect is surfaced.
pub fn load_workflow(
&mut self,
spec: crate::orchestration::workflow::WorkflowSpec,
parent_session_id: &str,
) -> LoopAction {
self.install_workflow(crate::orchestration::workflow::WorkflowRun::new(
&spec,
parent_session_id,
))
}
/// R3-1: append nodes to the in-flight workflow DAG at runtime, then drive one gated spawn round
/// so any now-ready node starts immediately (alongside the still-running submitter). The append
/// is pure graph mutation; each appended node's *spawn* still passes through the spawn gate in
/// [`Self::spawn_ready_workflow_nodes`] (quota / depth / quarantine), so this adds no new gate and
/// can't outrun the concurrency cap. No active workflow (or an empty submission) → a no-op that
/// leaves the current suspension untouched.
pub fn submit_workflow_nodes(
&mut self,
nodes: Vec<crate::orchestration::workflow::WorkflowNode>,
submitter_agent_id: Option<&str>,
) -> LoopAction {
if nodes.is_empty() || self.workflow.is_none() {
return LoopAction::AwaitingResume;
}
// R3-1 governance: gate DAG growth through the syscall trap. A `max_workflow_nodes` quota
// denies a submission that would grow the workflow past the cap (runaway loop-until-done
// backstop); the workflow continues with its existing nodes and a rollback note is surfaced.
let disposition = self.evaluate_syscall(&Syscall::SubmitNodes { count: nodes.len() });
if !disposition.is_allowed() {
let reason = match &disposition {
Disposition::Deny { reason, .. } => reason.clone(),
_ => "workflow node submission denied".to_string(),
};
let rb = RollbackReason::GovernanceDenied {
tool_name: "submit_workflow_nodes".to_string(),
reason,
};
let note = Message::user(super::super::rollback::build_rollback_note(
&rb,
self.ctx.config.verbose_control_notes,
));
self.ctx
.push_signal(note.content.as_text().unwrap_or_default().to_string());
return LoopAction::AwaitingResume;
}
if let Some(run) = self.workflow.as_mut() {
// G1: route through the trust-aware entry point — a quarantined submitter's nodes are
// coerced to quarantined in-kernel before append (no topological privilege escalation).
run.submit_nodes_from(submitter_agent_id, nodes);
}
self.drive_workflow(None)
}
/// M5/G1: an agent authors a whole `WorkflowSpec` (the article's "model writes its own harness").
/// **Bootstrap-or-flatten** (one DAG, unified governance — never a workflow stack):
/// - **No workflow active** (top-level agent) ⇒ *bootstrap* the DAG via `install_workflow`, exactly
/// like the host-only `load_workflow`, but agent-reachable through the syscall trap.
/// - **Workflow active** (caller is a node) ⇒ *flatten*: append the spec's nodes through the same
/// trust-aware `submit_nodes_from` as `submit_workflow_nodes` (a spec is just a node batch).
///
/// Gated by `Syscall::LoadWorkflow` (the same `max_workflow_nodes` backstop as `SubmitNodes`), so an
/// authored harness cannot overgrow the DAG. A second author while a workflow is active flattens —
/// it never stacks — so there is no unbounded recursion of kernels. Empty spec → no-op.
pub fn submit_workflow(
&mut self,
spec: crate::orchestration::workflow::WorkflowSpec,
parent_session_id: &str,
submitter_agent_id: Option<&str>,
) -> LoopAction {
if spec.nodes.is_empty() {
return LoopAction::AwaitingResume;
}
let disposition = self.evaluate_syscall(&Syscall::LoadWorkflow {
node_count: spec.nodes.len(),
});
if !disposition.is_allowed() {
let reason = match &disposition {
Disposition::Deny { reason, .. } => reason.clone(),
_ => "workflow authoring denied".to_string(),
};
let rb = RollbackReason::GovernanceDenied {
tool_name: "start_workflow".to_string(),
reason,
};
let note = Message::user(super::super::rollback::build_rollback_note(
&rb,
self.ctx.config.verbose_control_notes,
));
self.ctx
.push_signal(note.content.as_text().unwrap_or_default().to_string());
return LoopAction::AwaitingResume;
}
match self.workflow.as_mut() {
// Flatten: caller is a workflow node; grow the existing DAG (G1 coercion applies).
Some(run) => {
run.submit_nodes_from(submitter_agent_id, spec.nodes);
self.drive_workflow(None)
}
// Bootstrap: top-level agent starts a brand-new workflow in this same kernel.
None => self.install_workflow(crate::orchestration::workflow::WorkflowRun::new(
&spec,
parent_session_id,
)),
}
}
/// W0-ABI resume: load a workflow whose listed node agent-ids already completed (recovered from
/// the session log after an interruption); the kernel continues the DAG from the remaining work.
pub fn load_workflow_resumed(
&mut self,
spec: crate::orchestration::workflow::WorkflowSpec,
parent_session_id: &str,
submissions: &[Vec<crate::orchestration::workflow::WorkflowNode>],
completed: &[String],
) -> LoopAction {
self.install_workflow(crate::orchestration::workflow::WorkflowRun::resume(
&spec,
parent_session_id,
submissions,
completed,
))
}
fn install_workflow(
&mut self,
built: crate::types::error::Result<crate::orchestration::workflow::WorkflowRun>,
) -> LoopAction {
match built {
Ok(run) => {
self.workflow = Some(run);
self.drive_workflow(None)
}
Err(err) => {
let rb = RollbackReason::GovernanceDenied {
tool_name: "load_workflow".to_string(),
reason: err.to_string(),
};
let note = Message::user(super::super::rollback::build_rollback_note(
&rb,
self.ctx.config.verbose_control_notes,
));
self.ctx
.push_signal(note.content.as_text().unwrap_or_default().to_string());
self.phase = LoopPhase::Reason;
self.emit_call_llm()
}
}
}
/// Spawn every workflow node that is **ready now and fits under the concurrency cap**, each
/// gated through the *deferrable* spawn quota. A transient concurrency limit (`Defer`) stops
/// the round and leaves the remaining ready nodes untouched — a running sibling's completion
/// will free a slot and the next [`Self::drive_workflow`] round retries them (W2-1 収口: quota
/// backpressure = enqueue-and-retry, not permanent denial). A permanent limit (`Deny`, e.g.
/// depth) marks the node failed so its dependents starve. Returns the freshly spawned ids and
/// their `WorkflowSpawnInfo` (for the `WorkflowBatchSpawned` observation).
fn spawn_ready_workflow_nodes(
&mut self,
) -> (Vec<String>, Vec<crate::orchestration::workflow::WorkflowSpawnInfo>) {
// A2 tournament: a controller node whose deps are satisfied fans out into entrant children
// (and spawns no agent of its own) before we read the ready set — so its entrants/judges
// are picked up by the same run-queue spawn loop as any other node.
if let Some(run) = self.workflow.as_mut() {
run.expand_ready_controllers();
}
let ready = self
.workflow
.as_ref()
.map(|w| w.ready_batch())
.unwrap_or_default();
let mut spawned_ids: Vec<String> = Vec::new();
let mut spawned_infos: Vec<crate::orchestration::workflow::WorkflowSpawnInfo> = Vec::new();
for node in ready {
// W3 quarantine stage: a quarantined node that declares write privilege is a contradiction
// (it reads untrusted content) — deny the spawn in-kernel and starve its dependents, rather
// than trusting the SDK to honor read-only. Equivalent to `Deny{stage:"quarantine"}`.
if self.workflow.as_ref().is_some_and(|w| w.quarantine_violation(node)) {
if let Some(run) = self.workflow.as_mut() {
run.mark_denied(node);
}
let rb = RollbackReason::GovernanceDenied {
tool_name: format!(
"workflow-node:{}",
crate::orchestration::workflow::node_agent_id(node)
),
reason: "quarantine: quarantined node requested write-capable isolation".to_string(),
};
let note = Message::user(super::super::rollback::build_rollback_note(
&rb,
self.ctx.config.verbose_control_notes,
));
self.ctx
.push_signal(note.content.as_text().unwrap_or_default().to_string());
continue;
}
// Owned manifest — releases the immutable `self.workflow` borrow before the gate.
let manifest = match self.workflow.as_ref() {
Some(w) => w.manifest_for(node),
None => continue,
};
match self.evaluate_spawn_quota_deferrable() {
Disposition::Allow => {
let agent_id = manifest.agent_id.to_string();
let child = Tcb::spawned(&manifest, self.policy.clone());
self.tasks.insert(child);
if let Some(process) = self.tasks.get(&agent_id).and_then(AgentProcess::from_tcb) {
self.push_agent_process_changed(process);
}
if let Some(run) = self.workflow.as_mut() {
run.mark_spawned(node, &agent_id);
}
if let Some(run) = self.workflow.as_ref() {
spawned_infos.push(run.spawn_info(node));
}
spawned_ids.push(agent_id);
}
Disposition::Defer { .. } => {
// Concurrency cap reached: leave this node (and the rest of this round) Ready;
// the scheduler retries them once a running sibling frees a slot.
break;
}
_ => {
// Permanent denial (e.g. depth limit): the node fails; dependents starve.
if let Some(run) = self.workflow.as_mut() {
run.mark_denied(node);
}
}
}
}
(spawned_ids, spawned_infos)
}
/// Run-queue workflow executor (W2-1 収口 — the default, replacing the old batch barrier). Spawns
/// every currently-runnable ready node, then suspends on the running set or finishes. Unlike the
/// batch barrier, a node's dependents can start the moment *that* node completes, without waiting
/// for the slowest sibling in its dependency layer. For DAGs with no intra-layer skew
/// (fanout/linear) the spawn sequence is identical to the old batch path. `just_completed` is the
/// node whose completion triggered this round (`None` on the initial install).
fn drive_workflow(&mut self, just_completed: Option<String>) -> LoopAction {
// Drop the just-completed node from the running set (its TCB is already terminal).
if let Some(id) = just_completed.as_deref() {
if let Some(SuspendState::SubAgentAwait { agent_ids }) = self.suspend_state.as_mut() {
agent_ids.retain(|a| a != id);
}
}
// Spawn everything ready that fits under the concurrency cap right now.
let (spawned_ids, spawned_infos) = self.spawn_ready_workflow_nodes();
if !spawned_ids.is_empty() {
// G4: snapshot remaining budget *after* this batch's spawns are reflected in the running
// set, so a coordinator node reads accurate headroom for its next submission.
let budget = self.workflow_budget();
// W0-ABI: tell the SDK which nodes to run (with their goals) before suspending.
self.observations.push(KernelObservation::WorkflowBatchSpawned {
turn: self.turn,
nodes: spawned_infos,
budget,
});
match self.suspend_state.as_mut() {
Some(SuspendState::SubAgentAwait { agent_ids }) => {
agent_ids.extend(spawned_ids.iter().cloned());
}
_ => {
self.suspend_state = Some(SuspendState::SubAgentAwait {
agent_ids: spawned_ids.clone(),
});
}
}
let wait_ids: Vec<crate::scheduler::tcb::TaskId> = match &self.suspend_state {
Some(SuspendState::SubAgentAwait { agent_ids }) => {
agent_ids.iter().map(|s| s.clone().into()).collect()
}
_ => Vec::new(),
};
self.set_lifecycle(TaskState::Suspended, Some(WaitReason::SubAgentJoin(wait_ids)));
self.observations.push(KernelObservation::Suspended {
turn: self.turn,
reason: "workflow_batch".to_string(),
pending_calls: spawned_ids,
});
}
// Still nodes running? keep awaiting their completions.
let running = matches!(
self.suspend_state.as_ref(),
Some(SuspendState::SubAgentAwait { agent_ids }) if !agent_ids.is_empty()
);
if running {
return LoopAction::AwaitingResume;
}
// Nothing running and nothing newly spawned → the DAG is done, or stalled because a
// gated/denied dependency starves its dependents. Resume the parent loop.
self.suspend_state = None;
if let Some(id) = just_completed {
self.observations.push(KernelObservation::Resumed {
turn: self.turn,
approved: vec![id],
denied: Vec::new(),
});
}
self.finish_workflow()
}
/// Finish the in-flight workflow: emit `WorkflowCompleted` with its outcome, clear it, and
/// resume the parent loop. Shared by the all-gated path and the drained-no-more-ready path.
fn finish_workflow(&mut self) -> LoopAction {
if let Some(run) = self.workflow.as_ref() {
let (completed, failed) = run.outcome();
self.observations.push(KernelObservation::WorkflowCompleted {
turn: self.turn,
completed,
failed,
});
}
self.workflow = None;
self.phase = LoopPhase::Reason;
self.emit_call_llm()
}
/// W0/W2-1: advance the in-flight workflow after a node completed. Records the completion, then
/// hands off to the run-queue executor [`Self::drive_workflow`], which spawns any node whose
/// dependencies are now satisfied (without waiting for the rest of the completing node's layer)
/// and either suspends on the still-running set or finishes the workflow.
pub(super) fn advance_workflow(&mut self, result: SubAgentResult) -> LoopAction {
let agent_id = result.agent_id.to_string();
if let Some(run) = self.workflow.as_mut() {
run.record_completion(&agent_id, result.result.clone());
}
self.drive_workflow(Some(agent_id))
}
}