Skip to main content

mlua_swarm/core/
state.rs

1//! `EngineState` — the single `Mutex`-guarded state object — plus the
2//! supporting types.
3//!
4//! `EngineState` holds every mutable piece of engine flow state (task
5//! table, session table, prompts, token records, worker handles, resume
6//! table, per-task notifiers, resources, per-attempt output events, and the
7//! event log tail). It sits on the Domain side of the Data / Domain split
8//! and is unchanged by the Data-plane (`output_store` module) refactor.
9
10use crate::types::{CapToken, Role, SessionId, TaskId};
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use std::collections::HashMap;
14use std::sync::Arc;
15use tokio::sync::{broadcast, Notify};
16
17// ─── Resume / Task ─────────────────────────────────────────────────────────
18
19/// Opaque handle identifying one `query_senior` suspend/`resume` cycle.
20/// Stored on `TaskState.suspended_on` and as the key of
21/// `EngineState.pending_resumes`.
22#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct ResumeKey(pub String);
24
25impl ResumeKey {
26    /// Generate a random key (`R-<12 hex bytes>`).
27    pub fn new() -> Self {
28        Self(format!("R-{}", crate::types::uid_hex(12)))
29    }
30
31    /// Deterministic key for a Senior-escalation suspend on `task_id`
32    /// (`R-senior-<task_id>`), so repeated escalations on the same task
33    /// are addressable without extra bookkeeping.
34    pub fn for_senior(task_id: &TaskId) -> Self {
35        Self(format!("R-senior-{}", task_id.0))
36    }
37}
38
39impl Default for ResumeKey {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45/// Lifecycle state of a task. `Pending` is the only non-terminal,
46/// non-`Suspended` state before the first `dispatch_attempt_with`;
47/// `Pass` / `Blocked` / `Cancelled` are terminal.
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum TaskStatus {
51    /// Created via `start_task`, not yet dispatched.
52    Pending,
53    /// A `dispatch_attempt_with` call is in flight for this task.
54    Running,
55    /// Suspended awaiting a `query_senior`/`resume` round-trip.
56    Suspended,
57    /// The last attempt completed with `ok = true`.
58    Pass,
59    /// The last attempt completed with `ok = false` (or dispatch failed).
60    Blocked,
61    /// Cancelled via `cancel_task`.
62    Cancelled,
63}
64
65/// Static task definition supplied to `start_task`: which agent runs it
66/// and the initial prompt/directive text.
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct TaskSpec {
69    /// Name of the agent that should execute this task.
70    pub agent: String,
71    /// Prompt/directive text seeded for attempt 1.
72    pub initial_directive: String,
73}
74
75/// The full mutable record of one task: its static `spec`, current
76/// `status`, attempt counter, and bookkeeping timestamps. Cloned out of
77/// `EngineState` on every read (e.g. by `read_task_state` / `poll_task`).
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct TaskState {
80    /// Unique task identifier (assigned by `start_task`).
81    pub id: TaskId,
82    /// The static spec this task was created from.
83    pub spec: TaskSpec,
84    /// Current lifecycle status.
85    pub status: TaskStatus,
86    /// 1-based counter, bumped by `Engine::dispatch_attempt_with` each
87    /// time this task is dispatched.
88    pub attempt: u32,
89    /// Set while `status == Suspended`; the key needed to `resume` it.
90    pub suspended_on: Option<ResumeKey>,
91    /// Most recent result value posted via `post_result` or produced by a
92    /// completed attempt.
93    pub last_result: Option<Value>,
94    /// Unix timestamp (seconds) when the task was created.
95    pub created_at: u64,
96    /// Unix timestamp (seconds) of the last state mutation.
97    pub updated_at: u64,
98    /// Recursive swarm depth. The root (an Operator calling
99    /// `start_task`) is 0; a child spawned by a Worker calling
100    /// `start_task` is its parent's `depth + 1`. Exceeding
101    /// `EngineCfg.max_spawn_depth` raises `SpawnDepthExceeded`.
102    #[serde(default)]
103    pub spawn_depth: u32,
104}
105
106impl TaskState {
107    /// Construct a new `Pending` task with `attempt = 0` and
108    /// `spawn_depth = 0`; `created_at`/`updated_at` are set to now.
109    pub fn new(id: TaskId, spec: TaskSpec) -> Self {
110        let now = crate::types::now_unix();
111        Self {
112            id,
113            spec,
114            status: TaskStatus::Pending,
115            attempt: 0,
116            suspended_on: None,
117            last_result: None,
118            created_at: now,
119            updated_at: now,
120            spawn_depth: 0,
121        }
122    }
123}
124
125/// Result of a `dispatch_attempt_with` call (or the conceptual outcome of
126/// a task attempt more broadly).
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub enum DispatchOutcome {
129    /// The attempt completed with `ok = true`; carries the result value.
130    Pass(Value),
131    /// The attempt completed with `ok = false`, or dispatch itself failed;
132    /// carries the result/error value.
133    Blocked(Value),
134    /// The task suspended (e.g. via `query_senior`) before completing;
135    /// carries the key needed to `resume` it.
136    Suspended(ResumeKey),
137    /// The task was cancelled before completing.
138    Cancelled,
139    /// The attempt did not complete within the allotted time.
140    Timeout,
141}
142
143// ─── Session ───────────────────────────────────────────────────────────────
144
145/// Persisted record of one attached Operator session: identity, role,
146/// heartbeat bookkeeping, owned tasks, and the `OperatorKind` cascade
147/// inputs plus registry IDs used to rebuild `OperatorInfo` on dispatch
148/// (see `Engine::resolve_operator_info`).
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct OperatorSession {
151    /// Unique session identifier (distinct from the token nonce).
152    pub id: SessionId,
153    /// Caller-supplied name identifying the Operator (not necessarily
154    /// unique across sessions).
155    pub operator_id: String,
156    /// Role the session's token was minted with.
157    pub role: Role,
158    /// Unix timestamp (seconds) when the session was attached.
159    pub attached_at: u64,
160    /// Unix timestamp (seconds) of the last heartbeat/attach touch.
161    pub last_seen: u64,
162    /// Whether the session is currently considered live. Flipped to
163    /// `false` by `detach` or by `start_detach_loop` on a heartbeat miss.
164    pub attached: bool,
165    /// Task IDs started by this session (via `start_task` while this
166    /// session's token was current).
167    pub owned_task_ids: Vec<TaskId>,
168    /// Nonce of the `CapToken` this session was attached with; used to
169    /// look sessions up by token in `with_state` closures.
170    pub token_nonce: String,
171    /// The Operator's `kind`, plus IDs of
172    /// the `SeniorBridge` / `SpawnHook` registered on the engine's
173    /// `BridgeRegistry`. Persisted (all `String`; no `Arc<dyn ...>`). At
174    /// `dispatch_attempt` time the engine looks these up in the registry
175    /// and builds an `OperatorInfo` to inject into `Ctx`.
176    ///
177    /// # 4-tier `OperatorKind` cascade — "Runtime Global" tier
178    ///
179    /// This field is the literal value passed to `Engine::attach_with_ids`'s
180    /// `kind` parameter, and is fed to `crate::core::ctx::collapse_operator_kind`
181    /// as the `runtime_global` tier verbatim: `Some(_)` is always an
182    /// explicit Runtime Global request that outranks both BP tiers — even
183    /// `Some(OperatorKind::Automate)` — and `None` means "not requested",
184    /// letting the BP-level tiers (`bp_agent_kinds` / `bp_global_kind`) take
185    /// over. `#[serde(default)]` keeps existing persisted sessions (from
186    /// before this field existed / was `Option`) deserializing as `None`.
187    /// See `crate::core::ctx::collapse_operator_kind` for the full cascade +
188    /// rationale.
189    #[serde(default)]
190    pub operator_kind: Option<crate::core::ctx::OperatorKind>,
191    /// "Runtime Agent-level" tier (highest priority) of the `OperatorKind`
192    /// cascade — per-agent override supplied at task-launch time via
193    /// `TaskLaunchInput.operator_kind_overrides` / `TaskApplicationInput
194    /// .operator_kind_overrides`. Keyed by `AgentDef.name`.
195    #[serde(default)]
196    pub runtime_agent_kinds: HashMap<String, crate::core::ctx::OperatorKind>,
197    /// "BP Agent-level" tier of the `OperatorKind` cascade — baked at
198    /// `TaskLaunchService::launch` time from `Blueprint.operators[].kind`,
199    /// resolved per-agent via `AgentDef.spec.operator_ref`. Keyed by
200    /// `AgentDef.name` (not `OperatorDef.name`).
201    #[serde(default)]
202    pub bp_agent_kinds: HashMap<String, crate::core::ctx::OperatorKind>,
203    /// "BP Global" tier of the `OperatorKind` cascade — baked at
204    /// `TaskLaunchService::launch` time from `Blueprint.default_operator_kind`.
205    #[serde(default)]
206    pub bp_global_kind: Option<crate::core::ctx::OperatorKind>,
207    /// ID of the `Arc<dyn SeniorBridge>` registered on the engine's
208    /// `BridgeRegistry`, if any; resolved back into `OperatorInfo.senior_bridge`.
209    #[serde(default)]
210    pub bridge_id: Option<String>,
211    /// ID of the `Arc<dyn SpawnHook>` registered on the engine's
212    /// `BridgeRegistry`, if any; resolved back into `OperatorInfo.spawn_hook`.
213    #[serde(default)]
214    pub hook_id: Option<String>,
215    /// ID of the `Arc<dyn Operator>` registered on the `OperatorRegistry`.
216    /// Used by `OperatorDelegateMiddleware` when `kind = MainAi` /
217    /// `Composite` and `operator_id` is `Some`: it delegates the entire
218    /// spawn to `operator.execute`.
219    #[serde(default)]
220    pub operator_backend_id: Option<String>,
221}
222
223// ─── Token record (= server-side counter holder) ──────────────────────────
224
225/// Server-side counter/state holder paired 1:1 with a minted `CapToken`
226/// (keyed by nonce in `EngineState.tokens`). Tracks remaining uses,
227/// revocation, and — for Worker tokens — the task the token is bound to.
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct CapTokenRecord {
230    /// The token this record backs.
231    pub token: CapToken,
232    /// Remaining number of verb-consuming calls. `None` means unlimited
233    /// (session-style tokens); `Some(0)` makes `consume` fail.
234    pub uses_left: Option<u32>, // None = unlimited (session)
235    /// When `true`, `consume` always fails regardless of `uses_left`.
236    pub revoked: bool,
237    /// The task this Worker token is bound to (set when minted via
238    /// `dispatch_attempt`). Used on two axes:
239    ///   1. **Depth tracking.** When a Worker calls `start_task` to spawn a
240    ///      child, the child receives this task's `spawn_depth + 1`.
241    ///   2. **Ownership gate.** When a Worker calls a state-touch verb
242    ///      (`fetch_prompt` / `post_result` / `read_task_state` /
243    ///      `cancel_task` / `poll_task`), the argument's `task_id` must
244    ///      match this value. `start_task`
245    ///      and `dispatch_attempt` are exempt — recursive swarming must
246    ///      stay open, and depth is capped by `max_spawn_depth`.
247    ///
248    ///      Operator tokens (minted at attach time) leave this `None`, so
249    ///      they can touch any task.
250    #[serde(default)]
251    pub task_id: Option<TaskId>,
252}
253
254impl CapTokenRecord {
255    /// Wrap a freshly minted `CapToken` with no bound task (`task_id =
256    /// None`) — the shape used for Operator/session tokens.
257    pub fn from_token(token: CapToken) -> Self {
258        Self {
259            uses_left: token.max_uses,
260            token,
261            revoked: false,
262            task_id: None,
263        }
264    }
265
266    /// Convenience constructor used when minting a Worker token — binds
267    /// the record to the target task.
268    pub fn from_worker_token(token: CapToken, task_id: TaskId) -> Self {
269        Self {
270            uses_left: token.max_uses,
271            token,
272            revoked: false,
273            task_id: Some(task_id),
274        }
275    }
276
277    /// Consume one use. `None` (session token) always returns `Ok`;
278    /// `Some(0)` returns `Err`.
279    pub fn consume(&mut self) -> Result<(), CapTokenConsumeError> {
280        if self.revoked {
281            return Err(CapTokenConsumeError::Revoked);
282        }
283        match self.uses_left.as_mut() {
284            None => Ok(()),
285            Some(0) => Err(CapTokenConsumeError::Exhausted),
286            Some(n) => {
287                *n -= 1;
288                Ok(())
289            }
290        }
291    }
292}
293
294/// Why [`CapTokenRecord::consume`] refused to spend a use.
295#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
296pub enum CapTokenConsumeError {
297    /// The record was explicitly revoked (`revoked = true`); revocation
298    /// is permanent and independent of `uses_left`.
299    #[error("token revoked")]
300    Revoked,
301    /// The record's `uses_left` budget (`Some(0)`) is spent.
302    #[error("token uses exhausted")]
303    Exhausted,
304}
305
306// ─── Event ─────────────────────────────────────────────────────────────────
307
308/// Engine lifecycle event. Every event is both appended to
309/// `EngineState.event_log_tail` (in-process ring buffer) and broadcast on
310/// `Engine::event_tx` for live subscribers.
311#[derive(Debug, Clone, Serialize, Deserialize)]
312#[serde(tag = "kind", rename_all = "snake_case")]
313pub enum Event {
314    /// A session was attached (`attach` / `attach_with` / `attach_with_ids`).
315    SessionAttached {
316        /// The newly attached session.
317        session_id: SessionId,
318        /// Role its token was minted with.
319        role: Role,
320    },
321    /// A session was detached (`detach`, or a heartbeat-miss timeout).
322    SessionDetached {
323        /// The session that was detached.
324        session_id: SessionId,
325    },
326    /// A new task was created via `start_task`.
327    TaskCreated {
328        /// The newly created task.
329        task_id: TaskId,
330    },
331    /// An attempt began dispatching (not currently emitted by
332    /// `dispatch_attempt_with`; reserved for future use).
333    TaskAttemptStarted {
334        /// The task being dispatched.
335        task_id: TaskId,
336        /// The attempt number.
337        attempt: u32,
338    },
339    /// An attempt finished, Pass or Blocked, with the resulting value.
340    TaskAttemptCompleted {
341        /// The task whose attempt completed.
342        task_id: TaskId,
343        /// The attempt number that completed.
344        attempt: u32,
345        /// The result value produced by the attempt.
346        result: Value,
347    },
348    /// The task attempt completed with `ok = true`.
349    TaskPass {
350        /// The task that passed.
351        task_id: TaskId,
352        /// The result value.
353        result: Value,
354    },
355    /// The task attempt completed with `ok = false`.
356    TaskBlocked {
357        /// The task that was blocked.
358        task_id: TaskId,
359        /// The result/error value.
360        result: Value,
361    },
362    /// A worker appended an `OutputEvent` via `submit_output`.
363    WorkerOutput {
364        /// The task the output belongs to.
365        task_id: TaskId,
366        /// The attempt the output belongs to.
367        attempt: u32,
368        /// The appended output event.
369        event: crate::worker::output::OutputEvent,
370    },
371    /// The task suspended pending a `resume` for `key`.
372    TaskSuspended {
373        /// The suspended task.
374        task_id: TaskId,
375        /// The key needed to `resume` it.
376        key: ResumeKey,
377    },
378    /// The task resumed after `resume(key, ..)` was called.
379    TaskResumed {
380        /// The resumed task.
381        task_id: TaskId,
382        /// The key that was resumed.
383        key: ResumeKey,
384    },
385    /// The task was cancelled via `cancel_task`.
386    TaskCancelled {
387        /// The cancelled task.
388        task_id: TaskId,
389    },
390    /// `query_senior` was called, asking `question` on behalf of `task_id`.
391    SeniorQueried {
392        /// The task that triggered the query.
393        task_id: TaskId,
394        /// The question posed to the Senior.
395        question: Value,
396    },
397    /// A Senior's `answer` was stored via `resume`.
398    SeniorAnswered {
399        /// The task the answer applies to.
400        task_id: TaskId,
401        /// The Senior's answer.
402        answer: Value,
403    },
404}
405
406/// Receiver half of the engine-wide `Event` broadcast channel, obtained
407/// via `Engine::subscribe`.
408pub type EventStream = broadcast::Receiver<Event>;
409
410// ─── Resume pending (= Notify-based wait + stored answer) ─────────────────
411
412/// Entry for a task suspended via `query_senior`, waiting to be resumed.
413///
414/// The `Notify` + `answer: Option<Value>` form (rather than a oneshot
415/// channel) is deliberate: the answer stays inside `EngineState` even if
416/// the caller (an Operator) **detaches and reattaches**, so it can pull
417/// the answer out via `await_resume` after reattach.
418#[derive(Debug, Clone)]
419pub struct ResumePending {
420    /// Wakes any `await_resume` waiter once `answer` is set.
421    pub notify: Arc<Notify>,
422    /// The stored answer, once `resume` has been called for this key.
423    pub answer: Option<Value>,
424}
425
426impl ResumePending {
427    /// Create an unanswered pending entry (fresh `Notify`, `answer = None`).
428    pub fn new() -> Self {
429        Self {
430            notify: Arc::new(Notify::new()),
431            answer: None,
432        }
433    }
434}
435
436impl Default for ResumePending {
437    fn default() -> Self {
438        Self::new()
439    }
440}
441
442// ─── EngineState (= the locked thing) ──────────────────────────────────────
443
444/// The single `Mutex`-guarded blob of engine flow state, accessed only
445/// through `Engine::with_state` (see the R1-R4 discipline documented
446/// there).
447#[derive(Debug)]
448pub struct EngineState {
449    /// All known tasks, keyed by `TaskId`.
450    pub tasks: HashMap<TaskId, TaskState>,
451    /// All attached/detached sessions, keyed by `SessionId`.
452    pub sessions: HashMap<SessionId, OperatorSession>,
453    /// Per-`(task_id, attempt)` prompt/directive text, seeded from
454    /// `TaskSpec.initial_directive` and fetched via `fetch_prompt`.
455    pub prompts: HashMap<(TaskId, u32), String>,
456    /// Per-attempt `system_prompt`: `AgentDef.profile.system_prompt` is
457    /// baked at compile time, rendered inside `OperatorSpawner::spawn`,
458    /// and stashed here for the SubAgent to fetch alongside its prompt via
459    /// `HTTP /v1/worker/prompt`. The value is `Option<String>` so a missing
460    /// profile can be distinguished: an absent key means "not yet baked",
461    /// while `Some(None)` means "baked and profile is explicitly absent".
462    pub systems: HashMap<(TaskId, u32), Option<String>>,
463    /// All minted `CapToken` records, keyed by token nonce.
464    pub tokens: HashMap<String, CapTokenRecord>, // key = token nonce
465    /// Short worker handle (`wh-XXXXXXXX`, 12 chars) → token-nonce lookup
466    /// map. Resolves the `worker_handle` field a SubAgent receives with its
467    /// prompt. There is no signature verification: `task_id` is resolved by
468    /// a plain `HashMap` lookup — deliberately thin for the local
469    /// running over WebSocket, and adopted specifically to remove the
470    /// base64 copy-paste failure mode.
471    pub worker_handles: HashMap<String, String>,
472    /// Outstanding `query_senior` suspensions awaiting `resume`.
473    pub pending_resumes: HashMap<ResumeKey, ResumePending>,
474    /// Per-task notifier — `notify_waiters` fires on every task-status
475    /// change. Used by `poll_task` on the caller side, and by callers that
476    /// need to `await` again after detach/reattach.
477    pub task_notifies: HashMap<TaskId, Arc<Notify>>,
478    /// Arbitrary named resources set via `set_resource` and read via
479    /// `fetch_data`.
480    pub resources: HashMap<String, Value>,
481    /// Per-attempt output-event log. The `SpawnerAdapter` appends via
482    /// `submit_output`; the dispatch path pulls the terminal
483    /// `OutputEvent::Final` off the tail and decides Pass / Blocked.
484    pub output_store: HashMap<(TaskId, u32), Vec<crate::worker::output::OutputEvent>>,
485    /// Bounded in-process tail of recent `Event`s (most recent last),
486    /// trimmed to `event_log_max` by `push_event`.
487    pub event_log_tail: Vec<Event>,
488    /// Maximum length of `event_log_tail` before older entries are
489    /// dropped.
490    pub event_log_max: usize,
491}
492
493impl EngineState {
494    /// Construct an empty `EngineState` with `event_log_max = 1024`.
495    pub fn new() -> Self {
496        Self {
497            tasks: HashMap::new(),
498            sessions: HashMap::new(),
499            prompts: HashMap::new(),
500            systems: HashMap::new(),
501            tokens: HashMap::new(),
502            worker_handles: HashMap::new(),
503            pending_resumes: HashMap::new(),
504            task_notifies: HashMap::new(),
505            resources: HashMap::new(),
506            output_store: HashMap::new(),
507            event_log_tail: Vec::new(),
508            event_log_max: 1024,
509        }
510    }
511
512    /// Ensure a per-task `Notify` exists; return the existing one if any.
513    pub fn ensure_task_notify(&mut self, task_id: &TaskId) -> Arc<Notify> {
514        self.task_notifies
515            .entry(task_id.clone())
516            .or_insert_with(|| Arc::new(Notify::new()))
517            .clone()
518    }
519
520    /// Append `ev` to `event_log_tail`, trimming the oldest entries once
521    /// `event_log_max` is exceeded.
522    pub fn push_event(&mut self, ev: Event) {
523        self.event_log_tail.push(ev);
524        if self.event_log_tail.len() > self.event_log_max {
525            let overflow = self.event_log_tail.len() - self.event_log_max;
526            self.event_log_tail.drain(..overflow);
527        }
528    }
529}
530
531impl Default for EngineState {
532    fn default() -> Self {
533        Self::new()
534    }
535}