mlua_swarm/core/state.rs
1//! `EngineState` — the single `Mutex`-guarded state object — plus the
2//! supporting types.
3//!
4//! `EngineState` holds every mutable piece of engine flow state (task
5//! table, session table, prompts, token records, worker handles, resume
6//! table, per-task notifiers, resources, per-attempt output events, and the
7//! event log tail). It sits on the Domain side of the Data / Domain split
8//! and is unchanged by the Data-plane (`output_store` module) refactor.
9
10use crate::types::{CapToken, Role, SessionId, TaskId};
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use std::collections::HashMap;
14use std::sync::Arc;
15use tokio::sync::{broadcast, Notify};
16
17// ─── Resume / Task ─────────────────────────────────────────────────────────
18
19/// Opaque handle identifying one `query_senior` suspend/`resume` cycle.
20/// Stored on `TaskState.suspended_on` and as the key of
21/// `EngineState.pending_resumes`.
22#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct ResumeKey(pub String);
24
25impl ResumeKey {
26 /// Generate a random key (`R-<12 hex bytes>`).
27 pub fn new() -> Self {
28 Self(format!("R-{}", crate::types::uid_hex(12)))
29 }
30
31 /// Deterministic key for a Senior-escalation suspend on `task_id`
32 /// (`R-senior-<task_id>`), so repeated escalations on the same task
33 /// are addressable without extra bookkeeping.
34 pub fn for_senior(task_id: &TaskId) -> Self {
35 Self(format!("R-senior-{}", task_id.0))
36 }
37}
38
39impl Default for ResumeKey {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45/// Lifecycle state of a task. `Pending` is the only non-terminal,
46/// non-`Suspended` state before the first `dispatch_attempt_with`;
47/// `Pass` / `Blocked` / `Cancelled` are terminal.
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum TaskStatus {
51 /// Created via `start_task`, not yet dispatched.
52 Pending,
53 /// A `dispatch_attempt_with` call is in flight for this task.
54 Running,
55 /// Suspended awaiting a `query_senior`/`resume` round-trip.
56 Suspended,
57 /// The last attempt completed with `ok = true`.
58 Pass,
59 /// The last attempt completed with `ok = false` (or dispatch failed).
60 Blocked,
61 /// Cancelled via `cancel_task`.
62 Cancelled,
63}
64
65/// Static task definition supplied to `start_task`: which agent runs it
66/// and the initial prompt/directive text.
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct TaskSpec {
69 /// Name of the agent that should execute this task.
70 pub agent: String,
71 /// Prompt/directive text seeded for attempt 1.
72 pub initial_directive: String,
73}
74
75/// The full mutable record of one task: its static `spec`, current
76/// `status`, attempt counter, and bookkeeping timestamps. Cloned out of
77/// `EngineState` on every read (e.g. by `read_task_state` / `poll_task`).
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct TaskState {
80 /// Unique task identifier (assigned by `start_task`).
81 pub id: TaskId,
82 /// The static spec this task was created from.
83 pub spec: TaskSpec,
84 /// Current lifecycle status.
85 pub status: TaskStatus,
86 /// 1-based counter, bumped by `Engine::dispatch_attempt_with` each
87 /// time this task is dispatched.
88 pub attempt: u32,
89 /// Set while `status == Suspended`; the key needed to `resume` it.
90 pub suspended_on: Option<ResumeKey>,
91 /// Most recent result value posted via `post_result` or produced by a
92 /// completed attempt.
93 pub last_result: Option<Value>,
94 /// Unix timestamp (seconds) when the task was created.
95 pub created_at: u64,
96 /// Unix timestamp (seconds) of the last state mutation.
97 pub updated_at: u64,
98 /// Recursive swarm depth. The root (an Operator calling
99 /// `start_task`) is 0; a child spawned by a Worker calling
100 /// `start_task` is its parent's `depth + 1`. Exceeding
101 /// `EngineCfg.max_spawn_depth` raises `SpawnDepthExceeded`.
102 #[serde(default)]
103 pub spawn_depth: u32,
104}
105
106impl TaskState {
107 /// Construct a new `Pending` task with `attempt = 0` and
108 /// `spawn_depth = 0`; `created_at`/`updated_at` are set to now.
109 pub fn new(id: TaskId, spec: TaskSpec) -> Self {
110 let now = crate::types::now_unix();
111 Self {
112 id,
113 spec,
114 status: TaskStatus::Pending,
115 attempt: 0,
116 suspended_on: None,
117 last_result: None,
118 created_at: now,
119 updated_at: now,
120 spawn_depth: 0,
121 }
122 }
123}
124
125/// Result of a `dispatch_attempt_with` call (or the conceptual outcome of
126/// a task attempt more broadly).
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub enum DispatchOutcome {
129 /// The attempt completed with `ok = true`; carries the result value.
130 Pass(Value),
131 /// The attempt completed with `ok = false`, or dispatch itself failed;
132 /// carries the result/error value.
133 Blocked(Value),
134 /// The task suspended (e.g. via `query_senior`) before completing;
135 /// carries the key needed to `resume` it.
136 Suspended(ResumeKey),
137 /// The task was cancelled before completing.
138 Cancelled,
139 /// The attempt did not complete within the allotted time.
140 Timeout,
141}
142
143// ─── Session ───────────────────────────────────────────────────────────────
144
145/// Persisted record of one attached Operator session: identity, role,
146/// heartbeat bookkeeping, owned tasks, and the `OperatorKind` cascade
147/// inputs plus registry IDs used to rebuild `OperatorInfo` on dispatch
148/// (see `Engine::resolve_operator_info`).
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct OperatorSession {
151 /// Unique session identifier (distinct from the token nonce).
152 pub id: SessionId,
153 /// Caller-supplied name identifying the Operator (not necessarily
154 /// unique across sessions).
155 pub operator_id: String,
156 /// Role the session's token was minted with.
157 pub role: Role,
158 /// Unix timestamp (seconds) when the session was attached.
159 pub attached_at: u64,
160 /// Unix timestamp (seconds) of the last heartbeat/attach touch.
161 pub last_seen: u64,
162 /// Whether the session is currently considered live. Flipped to
163 /// `false` by `detach` or by `start_detach_loop` on a heartbeat miss.
164 pub attached: bool,
165 /// Task IDs started by this session (via `start_task` while this
166 /// session's token was current).
167 pub owned_task_ids: Vec<TaskId>,
168 /// Nonce of the `CapToken` this session was attached with; used to
169 /// look sessions up by token in `with_state` closures.
170 pub token_nonce: String,
171 /// The Operator's `kind`, plus IDs of
172 /// the `SeniorBridge` / `SpawnHook` registered on the engine's
173 /// `BridgeRegistry`. Persisted (all `String`; no `Arc<dyn ...>`). At
174 /// `dispatch_attempt` time the engine looks these up in the registry
175 /// and builds an `OperatorInfo` to inject into `Ctx`.
176 ///
177 /// # 4-tier `OperatorKind` cascade — "Runtime Global" tier
178 ///
179 /// This field is the literal value passed to `Engine::attach_with_ids`'s
180 /// `kind` parameter, and is fed to `crate::core::ctx::collapse_operator_kind`
181 /// as the `runtime_global` tier verbatim: `Some(_)` is always an
182 /// explicit Runtime Global request that outranks both BP tiers — even
183 /// `Some(OperatorKind::Automate)` — and `None` means "not requested",
184 /// letting the BP-level tiers (`bp_agent_kinds` / `bp_global_kind`) take
185 /// over. `#[serde(default)]` keeps existing persisted sessions (from
186 /// before this field existed / was `Option`) deserializing as `None`.
187 /// See `crate::core::ctx::collapse_operator_kind` for the full cascade +
188 /// rationale.
189 #[serde(default)]
190 pub operator_kind: Option<crate::core::ctx::OperatorKind>,
191 /// "Runtime Agent-level" tier (highest priority) of the `OperatorKind`
192 /// cascade — per-agent override supplied at task-launch time via
193 /// `TaskLaunchInput.operator_kind_overrides` / `TaskApplicationInput
194 /// .operator_kind_overrides`. Keyed by `AgentDef.name`.
195 #[serde(default)]
196 pub runtime_agent_kinds: HashMap<String, crate::core::ctx::OperatorKind>,
197 /// "BP Agent-level" tier of the `OperatorKind` cascade — baked at
198 /// `TaskLaunchService::launch` time from `Blueprint.operators[].kind`,
199 /// resolved per-agent via `AgentDef.spec.operator_ref`. Keyed by
200 /// `AgentDef.name` (not `OperatorDef.name`).
201 #[serde(default)]
202 pub bp_agent_kinds: HashMap<String, crate::core::ctx::OperatorKind>,
203 /// "BP Global" tier of the `OperatorKind` cascade — baked at
204 /// `TaskLaunchService::launch` time from `Blueprint.default_operator_kind`.
205 #[serde(default)]
206 pub bp_global_kind: Option<crate::core::ctx::OperatorKind>,
207 /// ID of the `Arc<dyn SeniorBridge>` registered on the engine's
208 /// `BridgeRegistry`, if any; resolved back into `OperatorInfo.senior_bridge`.
209 #[serde(default)]
210 pub bridge_id: Option<String>,
211 /// ID of the `Arc<dyn SpawnHook>` registered on the engine's
212 /// `BridgeRegistry`, if any; resolved back into `OperatorInfo.spawn_hook`.
213 #[serde(default)]
214 pub hook_id: Option<String>,
215 /// ID of the `Arc<dyn Operator>` registered on the `OperatorRegistry`.
216 /// Used by `OperatorDelegateMiddleware` when `kind = MainAi` /
217 /// `Composite` and `operator_id` is `Some`: it delegates the entire
218 /// spawn to `operator.execute`.
219 #[serde(default)]
220 pub operator_backend_id: Option<String>,
221}
222
223// ─── Token record (= server-side counter holder) ──────────────────────────
224
225/// Server-side counter/state holder paired 1:1 with a minted `CapToken`
226/// (keyed by nonce in `EngineState.tokens`). Tracks remaining uses,
227/// revocation, and — for Worker tokens — the task the token is bound to.
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct CapTokenRecord {
230 /// The token this record backs.
231 pub token: CapToken,
232 /// Remaining number of verb-consuming calls. `None` means unlimited
233 /// (session-style tokens); `Some(0)` makes `consume` fail.
234 pub uses_left: Option<u32>, // None = unlimited (session)
235 /// When `true`, `consume` always fails regardless of `uses_left`.
236 pub revoked: bool,
237 /// The task this Worker token is bound to (set when minted via
238 /// `dispatch_attempt`). Used on two axes:
239 /// 1. **Depth tracking.** When a Worker calls `start_task` to spawn a
240 /// child, the child receives this task's `spawn_depth + 1`.
241 /// 2. **Ownership gate.** When a Worker calls a state-touch verb
242 /// (`fetch_prompt` / `post_result` / `read_task_state` /
243 /// `cancel_task` / `poll_task`), the argument's `task_id` must
244 /// match this value. `start_task`
245 /// and `dispatch_attempt` are exempt — recursive swarming must
246 /// stay open, and depth is capped by `max_spawn_depth`.
247 ///
248 /// Operator tokens (minted at attach time) leave this `None`, so
249 /// they can touch any task.
250 #[serde(default)]
251 pub task_id: Option<TaskId>,
252}
253
254impl CapTokenRecord {
255 /// Wrap a freshly minted `CapToken` with no bound task (`task_id =
256 /// None`) — the shape used for Operator/session tokens.
257 pub fn from_token(token: CapToken) -> Self {
258 Self {
259 uses_left: token.max_uses,
260 token,
261 revoked: false,
262 task_id: None,
263 }
264 }
265
266 /// Convenience constructor used when minting a Worker token — binds
267 /// the record to the target task.
268 pub fn from_worker_token(token: CapToken, task_id: TaskId) -> Self {
269 Self {
270 uses_left: token.max_uses,
271 token,
272 revoked: false,
273 task_id: Some(task_id),
274 }
275 }
276
277 /// Consume one use. `None` (session token) always returns `Ok`;
278 /// `Some(0)` returns `Err`.
279 pub fn consume(&mut self) -> Result<(), CapTokenConsumeError> {
280 if self.revoked {
281 return Err(CapTokenConsumeError::Revoked);
282 }
283 match self.uses_left.as_mut() {
284 None => Ok(()),
285 Some(0) => Err(CapTokenConsumeError::Exhausted),
286 Some(n) => {
287 *n -= 1;
288 Ok(())
289 }
290 }
291 }
292}
293
294/// Why [`CapTokenRecord::consume`] refused to spend a use.
295#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
296pub enum CapTokenConsumeError {
297 /// The record was explicitly revoked (`revoked = true`); revocation
298 /// is permanent and independent of `uses_left`.
299 #[error("token revoked")]
300 Revoked,
301 /// The record's `uses_left` budget (`Some(0)`) is spent.
302 #[error("token uses exhausted")]
303 Exhausted,
304}
305
306// ─── Event ─────────────────────────────────────────────────────────────────
307
308/// Engine lifecycle event. Every event is both appended to
309/// `EngineState.event_log_tail` (in-process ring buffer) and broadcast on
310/// `Engine::event_tx` for live subscribers.
311#[derive(Debug, Clone, Serialize, Deserialize)]
312#[serde(tag = "kind", rename_all = "snake_case")]
313pub enum Event {
314 /// A session was attached (`attach` / `attach_with` / `attach_with_ids`).
315 SessionAttached {
316 /// The newly attached session.
317 session_id: SessionId,
318 /// Role its token was minted with.
319 role: Role,
320 },
321 /// A session was detached (`detach`, or a heartbeat-miss timeout).
322 SessionDetached {
323 /// The session that was detached.
324 session_id: SessionId,
325 },
326 /// A new task was created via `start_task`.
327 TaskCreated {
328 /// The newly created task.
329 task_id: TaskId,
330 },
331 /// An attempt began dispatching (not currently emitted by
332 /// `dispatch_attempt_with`; reserved for future use).
333 TaskAttemptStarted {
334 /// The task being dispatched.
335 task_id: TaskId,
336 /// The attempt number.
337 attempt: u32,
338 },
339 /// An attempt finished, Pass or Blocked, with the resulting value.
340 TaskAttemptCompleted {
341 /// The task whose attempt completed.
342 task_id: TaskId,
343 /// The attempt number that completed.
344 attempt: u32,
345 /// The result value produced by the attempt.
346 result: Value,
347 },
348 /// The task attempt completed with `ok = true`.
349 TaskPass {
350 /// The task that passed.
351 task_id: TaskId,
352 /// The result value.
353 result: Value,
354 },
355 /// The task attempt completed with `ok = false`.
356 TaskBlocked {
357 /// The task that was blocked.
358 task_id: TaskId,
359 /// The result/error value.
360 result: Value,
361 },
362 /// A worker appended an `OutputEvent` via `submit_output`.
363 WorkerOutput {
364 /// The task the output belongs to.
365 task_id: TaskId,
366 /// The attempt the output belongs to.
367 attempt: u32,
368 /// The appended output event.
369 event: crate::worker::output::OutputEvent,
370 },
371 /// The task suspended pending a `resume` for `key`.
372 TaskSuspended {
373 /// The suspended task.
374 task_id: TaskId,
375 /// The key needed to `resume` it.
376 key: ResumeKey,
377 },
378 /// The task resumed after `resume(key, ..)` was called.
379 TaskResumed {
380 /// The resumed task.
381 task_id: TaskId,
382 /// The key that was resumed.
383 key: ResumeKey,
384 },
385 /// The task was cancelled via `cancel_task`.
386 TaskCancelled {
387 /// The cancelled task.
388 task_id: TaskId,
389 },
390 /// `query_senior` was called, asking `question` on behalf of `task_id`.
391 SeniorQueried {
392 /// The task that triggered the query.
393 task_id: TaskId,
394 /// The question posed to the Senior.
395 question: Value,
396 },
397 /// A Senior's `answer` was stored via `resume`.
398 SeniorAnswered {
399 /// The task the answer applies to.
400 task_id: TaskId,
401 /// The Senior's answer.
402 answer: Value,
403 },
404}
405
406/// Receiver half of the engine-wide `Event` broadcast channel, obtained
407/// via `Engine::subscribe`.
408pub type EventStream = broadcast::Receiver<Event>;
409
410// ─── Resume pending (= Notify-based wait + stored answer) ─────────────────
411
412/// Entry for a task suspended via `query_senior`, waiting to be resumed.
413///
414/// The `Notify` + `answer: Option<Value>` form (rather than a oneshot
415/// channel) is deliberate: the answer stays inside `EngineState` even if
416/// the caller (an Operator) **detaches and reattaches**, so it can pull
417/// the answer out via `await_resume` after reattach.
418#[derive(Debug, Clone)]
419pub struct ResumePending {
420 /// Wakes any `await_resume` waiter once `answer` is set.
421 pub notify: Arc<Notify>,
422 /// The stored answer, once `resume` has been called for this key.
423 pub answer: Option<Value>,
424}
425
426impl ResumePending {
427 /// Create an unanswered pending entry (fresh `Notify`, `answer = None`).
428 pub fn new() -> Self {
429 Self {
430 notify: Arc::new(Notify::new()),
431 answer: None,
432 }
433 }
434}
435
436impl Default for ResumePending {
437 fn default() -> Self {
438 Self::new()
439 }
440}
441
442// ─── EngineState (= the locked thing) ──────────────────────────────────────
443
444/// The single `Mutex`-guarded blob of engine flow state, accessed only
445/// through `Engine::with_state` (see the R1-R4 discipline documented
446/// there).
447#[derive(Debug)]
448pub struct EngineState {
449 /// All known tasks, keyed by `TaskId`.
450 pub tasks: HashMap<TaskId, TaskState>,
451 /// All attached/detached sessions, keyed by `SessionId`.
452 pub sessions: HashMap<SessionId, OperatorSession>,
453 /// Per-`(task_id, attempt)` prompt/directive text, seeded from
454 /// `TaskSpec.initial_directive` and fetched via `fetch_prompt`.
455 pub prompts: HashMap<(TaskId, u32), String>,
456 /// Per-attempt `system_prompt`: `AgentDef.profile.system_prompt` is
457 /// baked at compile time, rendered inside `OperatorSpawner::spawn`,
458 /// and stashed here for the SubAgent to fetch alongside its prompt via
459 /// `HTTP /v1/worker/prompt`. The value is `Option<String>` so a missing
460 /// profile can be distinguished: an absent key means "not yet baked",
461 /// while `Some(None)` means "baked and profile is explicitly absent".
462 pub systems: HashMap<(TaskId, u32), Option<String>>,
463 /// All minted `CapToken` records, keyed by token nonce.
464 pub tokens: HashMap<String, CapTokenRecord>, // key = token nonce
465 /// Short worker handle (`wh-XXXXXXXX`, 12 chars) → token-nonce lookup
466 /// map. Resolves the `worker_handle` field a SubAgent receives with its
467 /// prompt. There is no signature verification: `task_id` is resolved by
468 /// a plain `HashMap` lookup — deliberately thin for the local
469 /// running over WebSocket, and adopted specifically to remove the
470 /// base64 copy-paste failure mode.
471 pub worker_handles: HashMap<String, String>,
472 /// Outstanding `query_senior` suspensions awaiting `resume`.
473 pub pending_resumes: HashMap<ResumeKey, ResumePending>,
474 /// Per-task notifier — `notify_waiters` fires on every task-status
475 /// change. Used by `poll_task` on the caller side, and by callers that
476 /// need to `await` again after detach/reattach.
477 pub task_notifies: HashMap<TaskId, Arc<Notify>>,
478 /// Arbitrary named resources set via `set_resource` and read via
479 /// `fetch_data`.
480 pub resources: HashMap<String, Value>,
481 /// Per-attempt output-event log. The `SpawnerAdapter` appends via
482 /// `submit_output`; the dispatch path pulls the terminal
483 /// `OutputEvent::Final` off the tail and decides Pass / Blocked.
484 pub output_store: HashMap<(TaskId, u32), Vec<crate::worker::output::OutputEvent>>,
485 /// Bounded in-process tail of recent `Event`s (most recent last),
486 /// trimmed to `event_log_max` by `push_event`.
487 pub event_log_tail: Vec<Event>,
488 /// Maximum length of `event_log_tail` before older entries are
489 /// dropped.
490 pub event_log_max: usize,
491}
492
493impl EngineState {
494 /// Construct an empty `EngineState` with `event_log_max = 1024`.
495 pub fn new() -> Self {
496 Self {
497 tasks: HashMap::new(),
498 sessions: HashMap::new(),
499 prompts: HashMap::new(),
500 systems: HashMap::new(),
501 tokens: HashMap::new(),
502 worker_handles: HashMap::new(),
503 pending_resumes: HashMap::new(),
504 task_notifies: HashMap::new(),
505 resources: HashMap::new(),
506 output_store: HashMap::new(),
507 event_log_tail: Vec::new(),
508 event_log_max: 1024,
509 }
510 }
511
512 /// Ensure a per-task `Notify` exists; return the existing one if any.
513 pub fn ensure_task_notify(&mut self, task_id: &TaskId) -> Arc<Notify> {
514 self.task_notifies
515 .entry(task_id.clone())
516 .or_insert_with(|| Arc::new(Notify::new()))
517 .clone()
518 }
519
520 /// Append `ev` to `event_log_tail`, trimming the oldest entries once
521 /// `event_log_max` is exceeded.
522 pub fn push_event(&mut self, ev: Event) {
523 self.event_log_tail.push(ev);
524 if self.event_log_tail.len() > self.event_log_max {
525 let overflow = self.event_log_tail.len() - self.event_log_max;
526 self.event_log_tail.drain(..overflow);
527 }
528 }
529}
530
531impl Default for EngineState {
532 fn default() -> Self {
533 Self::new()
534 }
535}