Skip to main content

mlua_swarm/store/
output.rs

1//! # output_store — Data-plane support layer
2//!
3//! **Module built on the Data (Big Response handling) vs. Domain (Flow / verdict)
4//! separation axis.** Owns the machinery for shuttling big response bodies
5//! (4k-token-scale text / file paths / blobs) between SubAgents, and stays out
6//! of the Engine's flow control (the BLOCKED/PASS `if`/`else` verdicts). The
7//! Domain path — `engine.rs` `submit_output` / `output_tail` / dispatch — is
8//! left untouched.
9//!
10//! ## Why (Data / Domain separation)
11//!
12//! The Engine's `output_store` `HashMap` plus `Final.ok` extraction in
13//! the dispatch path already covers the Domain (verdict flow). Pushing Big Response
14//! payloads (LLM answers of several kilotokens, intermediate files, large
15//! blobs) through the same path floods MainAI context after only a handful of
16//! SubAgents and turns the return-text channel into a file-path junk drawer.
17//!
18//! Resolution: **MainAI only carries `OutputRef` (a small `out_id`); this
19//! module owns the big bodies.** The Domain (flow control) stays inside the
20//! Engine, the Data plane (Big Response handling) is completed by this module
21//! and its paired `SpawnerLayer`s, and the two do not interfere.
22//!
23//! Note that the Sub/Main-Agent split is not just a context-size trick — it is
24//! a support scaffold for MainAI, which is what keeps a pure Flow orchestrator
25//! from becoming either rigid or brittle. Data-plane offloading is one of the
26//! things that scaffold needs to work.
27//!
28//! ## Architecture (three lifecycle axes)
29//!
30//! Data handling is cut along three lifecycles. Mixing them collapses into
31//! Agent hardcoding, non-portability, or unmanaged growth:
32//!
33//! - **LC1 — Agent authoring (Swarm-independent):** the Agent contract in
34//!   `Agent.md` speaks in terms of `$IN_REFS` and a single EMIT tool. It does
35//!   not know Swarm-specific paths or ids.
36//! - **LC2 — Agent execution (Swarm = runtime environment):** at spawn time
37//!   the runtime injects env (`$IN_REFS` = previous `out_id` list, EMIT tool
38//!   plus token). The SubAgent POSTs directly to the store, bypassing MainAgent.
39//! - **LC3 — Swarm management (this module = Data owner):** intake (EMIT) →
40//!   allocate `OutputRef` → register → optional disk persistence. `get(out_id)`
41//!   feeds the next spawn's `IN_REFS`.
42//!
43//! ## Discipline
44//!
45//! - **SubAgent → MainAgent direct return is forbidden.** Big bodies never
46//!   ride the return text; MainAgent only holds an `OutputRef` (small id).
47//! - **Write path is the EMIT tool, once.** No file-side channel, no smuggling
48//!   through return text — the goal is to remove the "LLM forgets at the tail
49//!   of the task" failure mode by construction.
50//! - **Same-shape `SpawnerLayer` pattern.** All intake / inject flows through
51//!   `middleware/sink.rs` / `middleware/input_inject.rs` (both `SpawnerLayer`
52//!   impls). Same shape as `AgentResolver` / `ProjectNameAliasLayer`.
53//! - **Multi-in / multi-out is the default assumption**, even when the current
54//!   traffic is one or two refs. All handling goes through the sink pattern.
55//! - **Zero change to engine core.** Only additive Data-plane wiring; the
56//!   Domain path (`submit_output` / `output_tail` / dispatch verdict) stays as
57//!   it was.
58//!
59//! ## History
60//!
61//! The former standalone `mlua-swarm-output-store` crate was folded into
62//! `engine-core` as a module (a Repository/Store sibling to `issue_store` /
63//! `blueprint_store`). Independent distribution, separate ownership, and
64//! dependency isolation did not justify the crate boundary. The duplicated
65//! `OutputEvent` / `ContentRef` in `worker/output.rs` were absorbed here
66//! (canonical), and `worker/output.rs` was narrowed to re-exports plus the
67//! engine-specific `OutputSink` / `EngineSink`.
68
69use async_trait::async_trait;
70use serde::{Deserialize, Serialize};
71use serde_json::Value;
72use std::collections::HashMap;
73use std::path::PathBuf;
74use std::sync::Arc;
75use thiserror::Error;
76use tokio::sync::Mutex;
77
78/// Errors surfaced by the output store layer.
79#[derive(Debug, Error)]
80pub enum OutputStoreError {
81    /// The given `out_id` is not present in the store.
82    #[error("output not found: {0}")]
83    NotFound(String),
84    /// Internal invariant violation (i.e. an implementation bug).
85    #[error("internal: {0}")]
86    Internal(String),
87}
88
89/// Reference handle for a stored output (the id carried by `IN_REFS` at LC2).
90#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
91pub struct OutputRef(
92    /// The `out-`-prefixed short id string.
93    pub String,
94);
95
96impl OutputRef {
97    /// Allocate a fresh short reference (`out-` + 10 hex chars).
98    ///
99    /// Uses the same in-process-unique id form as `wh-` worker handles
100    /// (`types::uid_hex`). Short ids are a deliberate trade: legible / cheap
101    /// to carry in prompts, not unguessable — access control is the auth
102    /// gate's job, not the id's.
103    pub fn new() -> Self {
104        OutputRef(format!("out-{}", crate::types::uid_hex(5)))
105    }
106}
107
108impl Default for OutputRef {
109    fn default() -> Self {
110        Self::new()
111    }
112}
113
114/// A single output event submitted from a worker into the engine.
115///
116/// The only event type after `WorkerResult` was folded into this enum. The
117/// `SpawnerAdapter` is responsible for turning the wire form (stdout / NDJSON /
118/// file path / IPC) into this typed representation at the boundary.
119#[derive(Debug, Clone, Serialize, Deserialize)]
120#[serde(tag = "type", rename_all = "snake_case")]
121pub enum OutputEvent {
122    /// Progress marker (state name / note); carries no payload.
123    Progress {
124        /// Stage name.
125        stage: String,
126        /// Optional note.
127        note: Option<String>,
128    },
129    /// Streaming chunk (LLM tokens, partial JSON, etc.).
130    Partial {
131        /// The chunk itself.
132        chunk: ContentRef,
133    },
134    /// Named artifact (file / blob / intermediate product).
135    Artifact {
136        /// Artifact name.
137        name: String,
138        /// Artifact body.
139        content: ContentRef,
140    },
141    /// Terminal event (the former `WorkerResult`). Exactly one per attempt,
142    /// emitted last.
143    Final {
144        /// Output body.
145        content: ContentRef,
146        /// Transport-level success flag. This is the only piece of information
147        /// the engine's dispatch path consults for flow control. Domain-level
148        /// verdicts (e.g. `"blocked"`) live as plain data inside `content` and
149        /// are consumed by Flow.ir conds (`Eq($.<step>.verdict, Lit(..))`).
150        ok: bool,
151    },
152}
153
154/// How content travels — inline value or file path. Streaming is not carried
155/// as its own variant in this iteration.
156///
157/// The `SpawnerAdapter` picks the appropriate variant at the boundary. When
158/// metadata is unavailable it is acceptable to fall back to `Inline` with the
159/// raw value, prioritising basic functionality over metadata fidelity.
160#[derive(Debug, Clone, Serialize, Deserialize)]
161#[serde(tag = "kind", rename_all = "snake_case")]
162pub enum ContentRef {
163    /// Inline JSON. Kilobyte-scale, the default for structured data.
164    Inline {
165        /// JSON body.
166        value: Value,
167    },
168    /// File-path handoff for large / binary / artifact content. File
169    /// ownership and cleanup belong to the engine side (carry); the spawner
170    /// only hands the path over.
171    FileRef {
172        /// File path.
173        path: PathBuf,
174        /// MIME hint.
175        mime: Option<String>,
176        /// Size hint in bytes.
177        size_hint: Option<u64>,
178    },
179}
180
181impl ContentRef {
182    /// Wrap a `serde_json::Value` as an `Inline` content ref.
183    pub fn inline(value: Value) -> Self {
184        ContentRef::Inline { value }
185    }
186
187    /// Wrap raw text as an `Inline` content ref (the common path for a
188    /// `ProcessSpawner` running in plain mode).
189    pub fn inline_text(text: impl Into<String>) -> Self {
190        ContentRef::Inline {
191            value: Value::String(text.into()),
192        }
193    }
194
195    /// `FileRef` helper. Fill in `mime` / `size_hint` when the spawner knows
196    /// them.
197    pub fn file_ref(
198        path: impl Into<PathBuf>,
199        mime: Option<String>,
200        size_hint: Option<u64>,
201    ) -> Self {
202        ContentRef::FileRef {
203            path: path.into(),
204            mime,
205            size_hint,
206        }
207    }
208}
209
210/// Metadata for one registered output.
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct OutputRecord {
213    /// Allocated id.
214    pub id: OutputRef,
215    /// Producing task.
216    pub task_id: String,
217    /// Attempt number.
218    pub attempt: u32,
219    /// Producing agent name.
220    pub producer_agent: String,
221    /// The event itself.
222    pub event: OutputEvent,
223    /// Parent output refs (chained ids received via handoff).
224    pub parent_refs: Vec<OutputRef>,
225}
226
227/// The LC3 (Swarm management) interface.
228///
229/// Backing implementations are pluggable (in-memory, SQLite, filesystem,
230/// etc.). The MVP ships only the in-memory backend; SQLite / filesystem
231/// backends are a future carry.
232#[async_trait]
233pub trait OutputStore: Send + Sync {
234    /// Intake an event, allocate an id, and register the record. Returns the
235    /// freshly allocated ref.
236    async fn append(
237        &self,
238        task_id: &str,
239        attempt: u32,
240        producer_agent: &str,
241        event: OutputEvent,
242        parent_refs: Vec<OutputRef>,
243    ) -> Result<OutputRef, OutputStoreError>;
244
245    /// Look up a record by id (LC2 `IN_REFS` resolution — the value handed
246    /// to the next spawn on handoff).
247    async fn get(&self, id: &OutputRef) -> Result<OutputRecord, OutputStoreError>;
248
249    /// Look up the **latest** record emitted under the given producer name
250    /// (`out_name` addressing — the logical, agent-based sibling of `get`).
251    /// Names are producer-scoped, not task-scoped: the newest emit wins.
252    async fn get_latest_by_name(&self, name: &str) -> Result<OutputRecord, OutputStoreError>;
253
254    /// List every record for a given `(task_id, attempt)` pair. Used where
255    /// the dispatch path pulls the verdict view.
256    async fn list_for_attempt(
257        &self,
258        task_id: &str,
259        attempt: u32,
260    ) -> Result<Vec<OutputRecord>, OutputStoreError>;
261}
262
263/// MVP implementation — in-memory, and the default for tests and prototyping.
264///
265/// Production deployments swap in a SQLite or filesystem backend (carry).
266#[derive(Debug, Default, Clone)]
267pub struct InMemoryOutputStore {
268    inner: Arc<Mutex<InMemoryInner>>,
269}
270
271#[derive(Debug, Default)]
272struct InMemoryInner {
273    by_id: HashMap<OutputRef, OutputRecord>,
274    by_attempt: HashMap<(String, u32), Vec<OutputRef>>,
275    /// producer_agent → emitted refs in insertion order (last = latest).
276    by_name: HashMap<String, Vec<OutputRef>>,
277}
278
279impl InMemoryOutputStore {
280    /// Construct a fresh, empty store.
281    pub fn new() -> Self {
282        Self::default()
283    }
284}
285
286#[async_trait]
287impl OutputStore for InMemoryOutputStore {
288    async fn append(
289        &self,
290        task_id: &str,
291        attempt: u32,
292        producer_agent: &str,
293        event: OutputEvent,
294        parent_refs: Vec<OutputRef>,
295    ) -> Result<OutputRef, OutputStoreError> {
296        let id = OutputRef::new();
297        let record = OutputRecord {
298            id: id.clone(),
299            task_id: task_id.to_string(),
300            attempt,
301            producer_agent: producer_agent.to_string(),
302            event,
303            parent_refs,
304        };
305        let mut guard = self.inner.lock().await;
306        guard.by_id.insert(id.clone(), record);
307        guard
308            .by_attempt
309            .entry((task_id.to_string(), attempt))
310            .or_default()
311            .push(id.clone());
312        guard
313            .by_name
314            .entry(producer_agent.to_string())
315            .or_default()
316            .push(id.clone());
317        Ok(id)
318    }
319
320    async fn get(&self, id: &OutputRef) -> Result<OutputRecord, OutputStoreError> {
321        let guard = self.inner.lock().await;
322        guard
323            .by_id
324            .get(id)
325            .cloned()
326            .ok_or_else(|| OutputStoreError::NotFound(id.0.clone()))
327    }
328
329    async fn get_latest_by_name(&self, name: &str) -> Result<OutputRecord, OutputStoreError> {
330        let guard = self.inner.lock().await;
331        let latest = guard
332            .by_name
333            .get(name)
334            .and_then(|ids| ids.last())
335            .ok_or_else(|| OutputStoreError::NotFound(name.to_string()))?;
336        guard
337            .by_id
338            .get(latest)
339            .cloned()
340            .ok_or_else(|| OutputStoreError::Internal(format!("name index dangling: {name}")))
341    }
342
343    async fn list_for_attempt(
344        &self,
345        task_id: &str,
346        attempt: u32,
347    ) -> Result<Vec<OutputRecord>, OutputStoreError> {
348        let guard = self.inner.lock().await;
349        let ids = guard
350            .by_attempt
351            .get(&(task_id.to_string(), attempt))
352            .cloned()
353            .unwrap_or_default();
354        let mut out = Vec::with_capacity(ids.len());
355        for id in ids {
356            if let Some(r) = guard.by_id.get(&id) {
357                out.push(r.clone());
358            }
359        }
360        Ok(out)
361    }
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367
368    #[tokio::test]
369    async fn append_then_get_roundtrip() {
370        let store = InMemoryOutputStore::new();
371        let event = OutputEvent::Final {
372            content: ContentRef::Inline {
373                value: Value::String("hello".into()),
374            },
375            ok: true,
376        };
377        let id = store
378            .append("task-1", 1, "agent-a", event.clone(), vec![])
379            .await
380            .expect("append");
381        let got = store.get(&id).await.expect("get");
382        assert_eq!(got.id, id);
383        assert_eq!(got.task_id, "task-1");
384        assert_eq!(got.attempt, 1);
385        assert_eq!(got.producer_agent, "agent-a");
386        match got.event {
387            OutputEvent::Final { ok, .. } => assert!(ok),
388            _ => panic!("wrong event variant"),
389        }
390    }
391
392    #[tokio::test]
393    async fn list_for_attempt_orders_by_insertion() {
394        let store = InMemoryOutputStore::new();
395        let e1 = OutputEvent::Progress {
396            stage: "s1".into(),
397            note: None,
398        };
399        let e2 = OutputEvent::Progress {
400            stage: "s2".into(),
401            note: None,
402        };
403        let id1 = store.append("t", 1, "a", e1, vec![]).await.expect("append");
404        let id2 = store.append("t", 1, "a", e2, vec![]).await.expect("append");
405        let list = store.list_for_attempt("t", 1).await.expect("list");
406        assert_eq!(list.len(), 2);
407        assert_eq!(list[0].id, id1);
408        assert_eq!(list[1].id, id2);
409    }
410
411    #[tokio::test]
412    async fn out_ref_is_short_prefixed_form() {
413        let r = OutputRef::new();
414        assert!(r.0.starts_with("out-"), "prefix: {}", r.0);
415        let hex = &r.0["out-".len()..];
416        assert_eq!(hex.len(), 10, "10 hex chars: {}", r.0);
417        assert!(hex.chars().all(|c| c.is_ascii_hexdigit()), "hex: {}", r.0);
418    }
419
420    #[tokio::test]
421    async fn get_latest_by_name_returns_newest_emit() {
422        let store = InMemoryOutputStore::new();
423        let e = |s: &str| OutputEvent::Progress {
424            stage: s.into(),
425            note: None,
426        };
427        store
428            .append("t", 1, "agent-a", e("first"), vec![])
429            .await
430            .expect("append 1");
431        let id2 = store
432            .append("t2", 1, "agent-a", e("second"), vec![])
433            .await
434            .expect("append 2");
435        // no producer bleed-through between attempts
436        store
437            .append("t", 1, "agent-b", e("other"), vec![])
438            .await
439            .expect("append 3");
440        let got = store.get_latest_by_name("agent-a").await.expect("by name");
441        assert_eq!(got.id, id2, "latest emit wins");
442        assert_eq!(got.task_id, "t2");
443    }
444
445    #[tokio::test]
446    async fn get_latest_by_name_unknown_returns_not_found() {
447        let store = InMemoryOutputStore::new();
448        let err = store.get_latest_by_name("nobody").await.unwrap_err();
449        assert!(matches!(err, OutputStoreError::NotFound(_)));
450    }
451
452    #[tokio::test]
453    async fn get_not_found_returns_error() {
454        let store = InMemoryOutputStore::new();
455        let missing = OutputRef("missing".into());
456        let err = store.get(&missing).await.unwrap_err();
457        assert!(matches!(err, OutputStoreError::NotFound(_)));
458    }
459
460    #[tokio::test]
461    async fn parent_refs_are_persisted() {
462        let store = InMemoryOutputStore::new();
463        let parent = OutputRef::new();
464        let event = OutputEvent::Final {
465            content: ContentRef::Inline { value: Value::Null },
466            ok: true,
467        };
468        let id = store
469            .append("t", 1, "a", event, vec![parent.clone()])
470            .await
471            .expect("append");
472        let got = store.get(&id).await.expect("get");
473        assert_eq!(got.parent_refs, vec![parent]);
474    }
475}