Skip to main content

mlua_swarm/store/output/
mod.rs

1//! # output_store — Data-plane support layer
2//!
3//! **Module built on the Data (Big Response handling) vs. Domain (Flow / verdict)
4//! separation axis.** Owns the machinery for shuttling big response bodies
5//! (4k-token-scale text / file paths / blobs) between SubAgents, and stays out
6//! of the Engine's flow control (the BLOCKED/PASS `if`/`else` verdicts). The
7//! Domain path — `engine.rs` `submit_output` / `output_tail` / dispatch — is
8//! left untouched.
9//!
10//! ## Why (Data / Domain separation)
11//!
12//! The Engine's `output_store` `HashMap` plus `Final.ok` extraction in
13//! the dispatch path already covers the Domain (verdict flow). Pushing Big Response
14//! payloads (LLM answers of several kilotokens, intermediate files, large
15//! blobs) through the same path floods MainAI context after only a handful of
16//! SubAgents and turns the return-text channel into a file-path junk drawer.
17//!
18//! Resolution: **MainAI only carries `OutputRef` (a small `out_id`); this
19//! module owns the big bodies.** The Domain (flow control) stays inside the
20//! Engine, the Data plane (Big Response handling) is completed by this module
21//! and its paired `SpawnerLayer`s, and the two do not interfere.
22//!
23//! Note that the Sub/Main-Agent split is not just a context-size trick — it is
24//! a support scaffold for MainAI, which is what keeps a pure Flow orchestrator
25//! from becoming either rigid or brittle. Data-plane offloading is one of the
26//! things that scaffold needs to work.
27//!
28//! ## Architecture (three lifecycle axes)
29//!
30//! Data handling is cut along three lifecycles. Mixing them collapses into
31//! Agent hardcoding, non-portability, or unmanaged growth:
32//!
33//! - **LC1 — Agent authoring (Swarm-independent):** the Agent contract in
34//!   `Agent.md` speaks in terms of `$IN_REFS` and a single EMIT tool. It does
35//!   not know Swarm-specific paths or ids.
36//! - **LC2 — Agent execution (Swarm = runtime environment):** at spawn time
37//!   the runtime injects env (`$IN_REFS` = previous `out_id` list, EMIT tool
38//!   plus token). The SubAgent POSTs directly to the store, bypassing MainAgent.
39//! - **LC3 — Swarm management (this module = Data owner):** intake (EMIT) →
40//!   allocate `OutputRef` → register → optional disk persistence. `get(out_id)`
41//!   feeds the next spawn's `IN_REFS`.
42//!
43//! ## Discipline
44//!
45//! - **SubAgent → MainAgent direct return is forbidden.** Big bodies never
46//!   ride the return text; MainAgent only holds an `OutputRef` (small id).
47//! - **Write path is the EMIT tool, once.** No file-side channel, no smuggling
48//!   through return text — the goal is to remove the "LLM forgets at the tail
49//!   of the task" failure mode by construction.
50//! - **Same-shape `SpawnerLayer` pattern.** All intake / inject flows through
51//!   `middleware/sink.rs` / `middleware/input_inject.rs` (both `SpawnerLayer`
52//!   impls). Same shape as `AgentResolver` / `ProjectNameAliasLayer`.
53//! - **Multi-in / multi-out is the default assumption**, even when the current
54//!   traffic is one or two refs. All handling goes through the sink pattern.
55//! - **Zero change to engine core.** Only additive Data-plane wiring; the
56//!   Domain path (`submit_output` / `output_tail` / dispatch verdict) stays as
57//!   it was.
58//!
59//! ## History
60//!
61//! The former standalone `mlua-swarm-output-store` crate was folded into
62//! `engine-core` as a module (a Repository/Store sibling to `issue_store` /
63//! `blueprint_store`). Independent distribution, separate ownership, and
64//! dependency isolation did not justify the crate boundary. The duplicated
65//! `OutputEvent` / `ContentRef` in `worker/output.rs` were absorbed here
66//! (canonical), and `worker/output.rs` was narrowed to re-exports plus the
67//! engine-specific `OutputSink` / `EngineSink`.
68
69pub mod sqlite;
70pub use sqlite::SqliteOutputStore;
71
72use async_trait::async_trait;
73use serde::{Deserialize, Serialize};
74use serde_json::Value;
75use std::collections::HashMap;
76use std::path::PathBuf;
77use std::sync::Arc;
78use thiserror::Error;
79use tokio::sync::Mutex;
80
81/// Errors surfaced by the output store layer.
82#[derive(Debug, Error)]
83pub enum OutputStoreError {
84    /// The given `out_id` is not present in the store.
85    #[error("output not found: {0}")]
86    NotFound(String),
87    /// Internal invariant violation (i.e. an implementation bug).
88    #[error("internal: {0}")]
89    Internal(String),
90}
91
92/// Reference handle for a stored output (the id carried by `IN_REFS` at LC2).
93#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
94pub struct OutputRef(
95    /// The `out-`-prefixed short id string.
96    pub String,
97);
98
99impl OutputRef {
100    /// Allocate a fresh short reference (`out-` + 10 hex chars).
101    ///
102    /// Uses the same in-process-unique id form as `wh-` worker handles
103    /// (`types::uid_hex`). Short ids are a deliberate trade: legible / cheap
104    /// to carry in prompts, not unguessable — access control is the auth
105    /// gate's job, not the id's.
106    pub fn new() -> Self {
107        OutputRef(format!("out-{}", crate::types::uid_hex(5)))
108    }
109}
110
111impl Default for OutputRef {
112    fn default() -> Self {
113        Self::new()
114    }
115}
116
117/// A single output event submitted from a worker into the engine.
118///
119/// The only event type after `WorkerResult` was folded into this enum. The
120/// `SpawnerAdapter` is responsible for turning the wire form (stdout / NDJSON /
121/// file path / IPC) into this typed representation at the boundary.
122#[derive(Debug, Clone, Serialize, Deserialize)]
123#[serde(tag = "type", rename_all = "snake_case")]
124pub enum OutputEvent {
125    /// Progress marker (state name / note); carries no payload.
126    Progress {
127        /// Stage name.
128        stage: String,
129        /// Optional note.
130        note: Option<String>,
131    },
132    /// Streaming chunk (LLM tokens, partial JSON, etc.).
133    Partial {
134        /// The chunk itself.
135        chunk: ContentRef,
136    },
137    /// Named artifact (file / blob / intermediate product).
138    Artifact {
139        /// Artifact name.
140        name: String,
141        /// Artifact body.
142        content: ContentRef,
143    },
144    /// Terminal event (the former `WorkerResult`). Exactly one per attempt,
145    /// emitted last.
146    Final {
147        /// Output body.
148        content: ContentRef,
149        /// Transport-level success flag. This is the only piece of information
150        /// the engine's dispatch path consults for flow control. Domain-level
151        /// verdicts (e.g. `"blocked"`) live as plain data inside `content` and
152        /// are consumed by Flow.ir conds (`Eq($.<step>.verdict, Lit(..))`).
153        ok: bool,
154    },
155}
156
157/// How content travels — inline value or file path. Streaming is not carried
158/// as its own variant in this iteration.
159///
160/// The `SpawnerAdapter` picks the appropriate variant at the boundary. When
161/// metadata is unavailable it is acceptable to fall back to `Inline` with the
162/// raw value, prioritising basic functionality over metadata fidelity.
163#[derive(Debug, Clone, Serialize, Deserialize)]
164#[serde(tag = "kind", rename_all = "snake_case")]
165pub enum ContentRef {
166    /// Inline JSON. Kilobyte-scale, the default for structured data.
167    Inline {
168        /// JSON body.
169        value: Value,
170    },
171    /// File-path handoff for large / binary / artifact content. File
172    /// ownership and cleanup belong to the engine side (carry); the spawner
173    /// only hands the path over.
174    FileRef {
175        /// File path.
176        path: PathBuf,
177        /// MIME hint.
178        mime: Option<String>,
179        /// Size hint in bytes.
180        size_hint: Option<u64>,
181    },
182}
183
184impl ContentRef {
185    /// Wrap a `serde_json::Value` as an `Inline` content ref.
186    pub fn inline(value: Value) -> Self {
187        ContentRef::Inline { value }
188    }
189
190    /// Wrap raw text as an `Inline` content ref (the common path for a
191    /// `ProcessSpawner` running in plain mode).
192    pub fn inline_text(text: impl Into<String>) -> Self {
193        ContentRef::Inline {
194            value: Value::String(text.into()),
195        }
196    }
197
198    /// `FileRef` helper. Fill in `mime` / `size_hint` when the spawner knows
199    /// them.
200    pub fn file_ref(
201        path: impl Into<PathBuf>,
202        mime: Option<String>,
203        size_hint: Option<u64>,
204    ) -> Self {
205        ContentRef::FileRef {
206            path: path.into(),
207            mime,
208            size_hint,
209        }
210    }
211}
212
213/// Metadata for one registered output.
214#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct OutputRecord {
216    /// Allocated id.
217    pub id: OutputRef,
218    /// Producing task.
219    pub task_id: String,
220    /// Attempt number.
221    pub attempt: u32,
222    /// Producing agent name.
223    pub producer_agent: String,
224    /// The event itself.
225    pub event: OutputEvent,
226    /// Parent output refs (chained ids received via handoff).
227    pub parent_refs: Vec<OutputRef>,
228}
229
230/// The LC3 (Swarm management) interface.
231///
232/// Backing implementations are pluggable (in-memory, SQLite, filesystem,
233/// etc.). The MVP ships only the in-memory backend; SQLite / filesystem
234/// backends are a future carry.
235#[async_trait]
236pub trait OutputStore: Send + Sync {
237    /// Intake an event, allocate an id, and register the record. Returns the
238    /// freshly allocated ref.
239    async fn append(
240        &self,
241        task_id: &str,
242        attempt: u32,
243        producer_agent: &str,
244        event: OutputEvent,
245        parent_refs: Vec<OutputRef>,
246    ) -> Result<OutputRef, OutputStoreError>;
247
248    /// Look up a record by id (LC2 `IN_REFS` resolution — the value handed
249    /// to the next spawn on handoff).
250    async fn get(&self, id: &OutputRef) -> Result<OutputRecord, OutputStoreError>;
251
252    /// Look up the **latest** record emitted under the given producer name
253    /// (`out_name` addressing — the logical, agent-based sibling of `get`).
254    /// Names are producer-scoped, not task-scoped: the newest emit wins.
255    async fn get_latest_by_name(&self, name: &str) -> Result<OutputRecord, OutputStoreError>;
256
257    /// List every record for a given `(task_id, attempt)` pair. Used where
258    /// the dispatch path pulls the verdict view.
259    async fn list_for_attempt(
260        &self,
261        task_id: &str,
262        attempt: u32,
263    ) -> Result<Vec<OutputRecord>, OutputStoreError>;
264}
265
266/// MVP implementation — in-memory, and the default for tests and prototyping.
267///
268/// Production deployments swap in a SQLite or filesystem backend (carry).
269#[derive(Debug, Default, Clone)]
270pub struct InMemoryOutputStore {
271    inner: Arc<Mutex<InMemoryInner>>,
272}
273
274#[derive(Debug, Default)]
275struct InMemoryInner {
276    by_id: HashMap<OutputRef, OutputRecord>,
277    by_attempt: HashMap<(String, u32), Vec<OutputRef>>,
278    /// producer_agent → emitted refs in insertion order (last = latest).
279    by_name: HashMap<String, Vec<OutputRef>>,
280}
281
282impl InMemoryOutputStore {
283    /// Construct a fresh, empty store.
284    pub fn new() -> Self {
285        Self::default()
286    }
287}
288
289#[async_trait]
290impl OutputStore for InMemoryOutputStore {
291    async fn append(
292        &self,
293        task_id: &str,
294        attempt: u32,
295        producer_agent: &str,
296        event: OutputEvent,
297        parent_refs: Vec<OutputRef>,
298    ) -> Result<OutputRef, OutputStoreError> {
299        let id = OutputRef::new();
300        let record = OutputRecord {
301            id: id.clone(),
302            task_id: task_id.to_string(),
303            attempt,
304            producer_agent: producer_agent.to_string(),
305            event,
306            parent_refs,
307        };
308        let mut guard = self.inner.lock().await;
309        guard.by_id.insert(id.clone(), record);
310        guard
311            .by_attempt
312            .entry((task_id.to_string(), attempt))
313            .or_default()
314            .push(id.clone());
315        guard
316            .by_name
317            .entry(producer_agent.to_string())
318            .or_default()
319            .push(id.clone());
320        Ok(id)
321    }
322
323    async fn get(&self, id: &OutputRef) -> Result<OutputRecord, OutputStoreError> {
324        let guard = self.inner.lock().await;
325        guard
326            .by_id
327            .get(id)
328            .cloned()
329            .ok_or_else(|| OutputStoreError::NotFound(id.0.clone()))
330    }
331
332    async fn get_latest_by_name(&self, name: &str) -> Result<OutputRecord, OutputStoreError> {
333        let guard = self.inner.lock().await;
334        let latest = guard
335            .by_name
336            .get(name)
337            .and_then(|ids| ids.last())
338            .ok_or_else(|| OutputStoreError::NotFound(name.to_string()))?;
339        guard
340            .by_id
341            .get(latest)
342            .cloned()
343            .ok_or_else(|| OutputStoreError::Internal(format!("name index dangling: {name}")))
344    }
345
346    async fn list_for_attempt(
347        &self,
348        task_id: &str,
349        attempt: u32,
350    ) -> Result<Vec<OutputRecord>, OutputStoreError> {
351        let guard = self.inner.lock().await;
352        let ids = guard
353            .by_attempt
354            .get(&(task_id.to_string(), attempt))
355            .cloned()
356            .unwrap_or_default();
357        let mut out = Vec::with_capacity(ids.len());
358        for id in ids {
359            if let Some(r) = guard.by_id.get(&id) {
360                out.push(r.clone());
361            }
362        }
363        Ok(out)
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    #[tokio::test]
372    async fn append_then_get_roundtrip() {
373        let store = InMemoryOutputStore::new();
374        let event = OutputEvent::Final {
375            content: ContentRef::Inline {
376                value: Value::String("hello".into()),
377            },
378            ok: true,
379        };
380        let id = store
381            .append("task-1", 1, "agent-a", event.clone(), vec![])
382            .await
383            .expect("append");
384        let got = store.get(&id).await.expect("get");
385        assert_eq!(got.id, id);
386        assert_eq!(got.task_id, "task-1");
387        assert_eq!(got.attempt, 1);
388        assert_eq!(got.producer_agent, "agent-a");
389        match got.event {
390            OutputEvent::Final { ok, .. } => assert!(ok),
391            _ => panic!("wrong event variant"),
392        }
393    }
394
395    #[tokio::test]
396    async fn list_for_attempt_orders_by_insertion() {
397        let store = InMemoryOutputStore::new();
398        let e1 = OutputEvent::Progress {
399            stage: "s1".into(),
400            note: None,
401        };
402        let e2 = OutputEvent::Progress {
403            stage: "s2".into(),
404            note: None,
405        };
406        let id1 = store.append("t", 1, "a", e1, vec![]).await.expect("append");
407        let id2 = store.append("t", 1, "a", e2, vec![]).await.expect("append");
408        let list = store.list_for_attempt("t", 1).await.expect("list");
409        assert_eq!(list.len(), 2);
410        assert_eq!(list[0].id, id1);
411        assert_eq!(list[1].id, id2);
412    }
413
414    #[tokio::test]
415    async fn out_ref_is_short_prefixed_form() {
416        let r = OutputRef::new();
417        assert!(r.0.starts_with("out-"), "prefix: {}", r.0);
418        let hex = &r.0["out-".len()..];
419        assert_eq!(hex.len(), 10, "10 hex chars: {}", r.0);
420        assert!(hex.chars().all(|c| c.is_ascii_hexdigit()), "hex: {}", r.0);
421    }
422
423    #[tokio::test]
424    async fn get_latest_by_name_returns_newest_emit() {
425        let store = InMemoryOutputStore::new();
426        let e = |s: &str| OutputEvent::Progress {
427            stage: s.into(),
428            note: None,
429        };
430        store
431            .append("t", 1, "agent-a", e("first"), vec![])
432            .await
433            .expect("append 1");
434        let id2 = store
435            .append("t2", 1, "agent-a", e("second"), vec![])
436            .await
437            .expect("append 2");
438        // no producer bleed-through between attempts
439        store
440            .append("t", 1, "agent-b", e("other"), vec![])
441            .await
442            .expect("append 3");
443        let got = store.get_latest_by_name("agent-a").await.expect("by name");
444        assert_eq!(got.id, id2, "latest emit wins");
445        assert_eq!(got.task_id, "t2");
446    }
447
448    #[tokio::test]
449    async fn get_latest_by_name_unknown_returns_not_found() {
450        let store = InMemoryOutputStore::new();
451        let err = store.get_latest_by_name("nobody").await.unwrap_err();
452        assert!(matches!(err, OutputStoreError::NotFound(_)));
453    }
454
455    #[tokio::test]
456    async fn get_not_found_returns_error() {
457        let store = InMemoryOutputStore::new();
458        let missing = OutputRef("missing".into());
459        let err = store.get(&missing).await.unwrap_err();
460        assert!(matches!(err, OutputStoreError::NotFound(_)));
461    }
462
463    #[tokio::test]
464    async fn parent_refs_are_persisted() {
465        let store = InMemoryOutputStore::new();
466        let parent = OutputRef::new();
467        let event = OutputEvent::Final {
468            content: ContentRef::Inline { value: Value::Null },
469            ok: true,
470        };
471        let id = store
472            .append("t", 1, "a", event, vec![parent.clone()])
473            .await
474            .expect("append");
475        let got = store.get(&id).await.expect("get");
476        assert_eq!(got.parent_refs, vec![parent]);
477    }
478}